Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.regex.Pattern;
6

    
7
import com.google.common.base.Function;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import com.mongodb.BasicDBObject;
11
import com.mongodb.DBObject;
12
import com.mongodb.MongoClient;
13
import com.mongodb.client.FindIterable;
14
import com.mongodb.client.MongoCollection;
15
import com.mongodb.client.model.Sorts;
16
import eu.dnetlib.enabling.resultset.ResultSet;
17
import eu.dnetlib.enabling.resultset.ResultSetAware;
18
import eu.dnetlib.enabling.resultset.ResultSetListener;
19
import eu.dnetlib.miscutils.maps.ConcurrentSizedMap;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.bson.BsonDocument;
23
import org.bson.conversions.Bson;
24

    
25
import static com.mongodb.client.model.Filters.*;
26

    
27
public class MongoResultSetListener implements ResultSetListener, ResultSetAware {
28

    
29
	private static final Log log = LogFactory.getLog(MongoResultSetListener.class);
30

    
31
	private ConcurrentSizedMap<Integer, String> lastKeys = new ConcurrentSizedMap<>();
32
	private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id"));
33

    
34
	private Function<DBObject, String> serializer;
35
	private MongoCollection<DBObject> collection;
36
	private Bson query;
37

    
38
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer) {
39
		this.collection = collection;
40
		this.serializer = serializer;
41
		this.query = query(from, until, filter);
42
		log.debug("Query on mongo: "+this.query.toBsonDocument(BsonDocument.class, MongoClient.getDefaultCodecRegistry()));
43
	}
44

    
45
	@Override
46
	public List<String> getResult(final int fromPosition, final int toPosition) {
47

    
48
		ArrayList<DBObject> page = null;
49

    
50
		String lastKey = lastKeys.get(fromPosition);
51
		if (lastKey != null) {
52
			page = continueFrom(lastKey, (toPosition - fromPosition) + 1);
53
		} else {
54
			page = fetchNew(fromPosition - 1, (toPosition - fromPosition) + 1);
55
		}
56

    
57
		if (!page.isEmpty()) {
58
			DBObject last = page.get(page.size() - 1);
59
			lastKeys.put(toPosition + 1, (String) last.get("id"));
60
		}
61

    
62
		if (log.isDebugEnabled()) {
63
			log.info(String.format("got %s records from %s to %s", page.size(), fromPosition, toPosition));
64
		}
65

    
66
		return Lists.newArrayList(Iterables.transform(page, serializer));
67
	}
68

    
69
	private ArrayList<DBObject> fetchNew(final int from, final int size) {
70
		final FindIterable<DBObject> it = collection.find(query).batchSize(size);
71
		return Lists.newArrayList(it.sort(sortByIdAsc).skip(from).limit(size));
72
	}
73

    
74
	private ArrayList<DBObject> continueFrom(final String lastKey, final int size) {
75
		if (log.isDebugEnabled()) {
76
			log.debug("trying to continue from previous key: " + lastKey);
77
		}
78
		final Bson q = and(query, gt("id", lastKey));
79
		final FindIterable<DBObject> it = collection.find(q).batchSize(size).sort(sortByIdAsc).limit(size);
80
		return Lists.newArrayList(it);
81
	}
82

    
83
	private Bson query(final Long from, final Long until, final Pattern pattern) {
84
		final Bson dateFilter = dateQuery(from, until);
85
		final Bson regexFilter = regexQuery(pattern);
86
		if (dateFilter != null & regexFilter != null) {
87
			return and(dateFilter, regexFilter);
88
		} else if (dateFilter != null) {
89
			return dateFilter;
90
		} else if (regexFilter != null) {
91
			return regexFilter;
92
		}
93
		return new BasicDBObject();
94
	}
95

    
96
	private Bson dateQuery(final Long from, final Long until) {
97
		if (from != null & until != null) {
98
			return and(gt("timestamp", from), lt("timestamp", until));
99
		}
100
		if (from != null) {
101
			return gt("timestamp", from);
102
		}
103
		if (until != null) {
104
			return lt("timestamp", until);
105
		}
106
		return null;
107
	}
108

    
109
	private Bson regexQuery(final Pattern pattern) {
110
		if (pattern != null) {
111
			return regex("body", pattern);
112
		}
113
		return null;
114
	}
115

    
116
	@Override
117
	public int getSize() {
118
		return (int) collection.count(query);
119
	}
120

    
121
	@Override
122
	public void setResultSet(final ResultSet resultSet) {
123
		resultSet.close();
124
	}
125

    
126
}
(5-5/5)