Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.regex.Pattern;
6

    
7
import com.google.common.base.Function;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import com.mongodb.BasicDBObject;
11
import com.mongodb.DBObject;
12
import com.mongodb.client.FindIterable;
13
import com.mongodb.client.MongoCollection;
14
import com.mongodb.client.model.Sorts;
15
import eu.dnetlib.enabling.resultset.ResultSet;
16
import eu.dnetlib.enabling.resultset.ResultSetAware;
17
import eu.dnetlib.enabling.resultset.ResultSetListener;
18
import eu.dnetlib.miscutils.maps.ConcurrentSizedMap;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.bson.conversions.Bson;
22

    
23
import static com.mongodb.client.model.Filters.*;
24

    
25
public class MongoResultSetListener implements ResultSetListener, ResultSetAware {
26

    
27
	private static final Log log = LogFactory.getLog(MongoResultSetListener.class);
28

    
29
	private ConcurrentSizedMap<Integer, String> lastKeys = new ConcurrentSizedMap<>();
30
	private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id"));
31

    
32
	private Function<DBObject, String> serializer;
33
	private MongoCollection<DBObject> collection;
34
	private Bson query;
35

    
36
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer) {
37
		this.collection = collection;
38
		this.serializer = serializer;
39
		this.query = query(from, until, filter);
40
	}
41

    
42
	@Override
43
	public List<String> getResult(final int fromPosition, final int toPosition) {
44

    
45
		ArrayList<DBObject> page = null;
46

    
47
		String lastKey = lastKeys.get(fromPosition);
48
		if (lastKey != null) {
49
			page = continueFrom(lastKey, (toPosition - fromPosition) + 1);
50
		} else {
51
			page = fetchNew(fromPosition - 1, (toPosition - fromPosition) + 1);
52
		}
53

    
54
		if (!page.isEmpty()) {
55
			DBObject last = page.get(page.size() - 1);
56
			lastKeys.put(toPosition + 1, (String) last.get("id"));
57
		}
58

    
59
		if (log.isDebugEnabled()) {
60
			log.info(String.format("got %s records from %s to %s", page.size(), fromPosition, toPosition));
61
		}
62

    
63
		return Lists.newArrayList(Iterables.transform(page, serializer));
64
	}
65

    
66
	private ArrayList<DBObject> fetchNew(final int from, final int size) {
67
		final FindIterable<DBObject> it = collection.find(query).batchSize(size);
68
		return Lists.newArrayList(it.sort(sortByIdAsc).skip(from).limit(size));
69
	}
70

    
71
	private ArrayList<DBObject> continueFrom(final String lastKey, final int size) {
72
		if (log.isDebugEnabled()) {
73
			log.debug("trying to continue from previous key: " + lastKey);
74
		}
75
		final Bson q = and(query, gt("id", lastKey));
76
		final FindIterable<DBObject> it = collection.find(q).batchSize(size).sort(sortByIdAsc).limit(size);
77
		return Lists.newArrayList(it);
78
	}
79

    
80
	private Bson query(final Long from, final Long until, final Pattern pattern) {
81
		final Bson dateFilter = dateQuery(from, until);
82
		final Bson regexFilter = regexQuery(pattern);
83
		if (dateFilter != null & regexFilter != null) {
84
			return and(dateFilter, regexFilter);
85
		} else if (dateFilter != null) {
86
			return dateFilter;
87
		} else if (regexFilter != null) {
88
			return regexFilter;
89
		}
90
		return new BasicDBObject();
91
	}
92

    
93
	private Bson dateQuery(final Long from, final Long until) {
94
		if (from != null & until != null) {
95
			return and(gt("timestamp", from), lt("timestamp", until));
96
		}
97
		if (from != null) {
98
			return gt("timestamp", from);
99
		}
100
		if (until != null) {
101
			return lt("timestamp", until);
102
		}
103
		return null;
104
	}
105

    
106
	private Bson regexQuery(final Pattern pattern) {
107
		if (pattern != null) {
108
			return regex("body", pattern);
109
		}
110
		return null;
111
	}
112

    
113
	@Override
114
	public int getSize() {
115
		return (int) collection.count(query);
116
	}
117

    
118
	@Override
119
	public void setResultSet(final ResultSet resultSet) {
120
		resultSet.close();
121
	}
122

    
123
}
(5-5/5)