Project

General

Profile

« Previous | Next » 

Revision 46851

implemented use of from/until for incremental record delivery

View differences:

MongoResultSetListener.java
3 3
import java.util.function.Function;
4 4
import java.util.regex.Pattern;
5 5

  
6
import com.mongodb.BasicDBObject;
6 7
import com.mongodb.DBObject;
7 8
import com.mongodb.client.MongoCollection;
8 9
import com.mongodb.client.MongoCursor;
9
import com.mongodb.client.model.Filters;
10 10
import com.mongodb.client.model.Sorts;
11 11
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
12 12
import eu.dnetlib.rmi.common.ResultSetException;
......
15 15
import org.apache.commons.logging.LogFactory;
16 16
import org.bson.conversions.Bson;
17 17

  
18
import static com.mongodb.client.model.Filters.*;
19

  
18 20
public class MongoResultSetListener implements ResultSetListener<String> {
19 21

  
20 22
	private static final Log log = LogFactory.getLog(MongoResultSetListener.class);
23

  
21 24
	private MongoCursor<DBObject> currentCursor;
22 25
	private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id"));
23 26
	private Function<DBObject, String> serializer;
24
	private Pattern filter;
27

  
25 28
	private MongoCollection<DBObject> collection;
26 29
	private int total = 0;
27 30
	private int count = 0;
28 31

  
29
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Pattern filter, final Function<DBObject, String> serializer, final boolean noCursorTimeout)
32
	private Bson query;
33

  
34
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer, final int batchSize)
30 35
			throws MDStoreServiceException {
31
		log.debug("Creating resultset - noCursorTimeout: "+noCursorTimeout);
32 36
		this.collection = collection;
33
		this.filter = filter;
34 37
		this.serializer = serializer;
35
		createCursor(noCursorTimeout);
36
		if (currentCursor == null)
37
			throw new MDStoreServiceException("Error on creating resultset the iterator is null");
38

  
38
		this.batchSize = batchSize;
39
		this.query = query(from, until, filter);
40
		createCursor();
39 41
	}
40 42

  
41 43
	private void createCursor(final boolean noCursorTimeout) throws MDStoreServiceException {
44
		log.debug("init mdstore cursor using query " + query);
42 45
		try {
43
			if (filter != null) {
44
				Bson query = Filters.regex("body", filter);
45
				total = (int) collection.count(query);
46
				currentCursor = collection.find(query).noCursorTimeout(noCursorTimeout).sort(sortByIdAsc).iterator();
47
			} else {
48
				total = (int) collection.count();
49
				currentCursor = collection.find().noCursorTimeout(noCursorTimeout).sort(sortByIdAsc).iterator();
50
			}
46
			total = (int) collection.count(query);
47
			currentCursor = collection.find(query).batchSize(batchSize).sort(sortByIdAsc).iterator();
51 48
		} catch (Throwable e) {
52
			throw new MDStoreServiceException("Error on initialize the cursor");
49
			throw new MDStoreServiceException("Error on initialize the Mongodb cursor");
53 50
		}
54 51
	}
55 52

  
53
	private Bson query(final Long from, final Long until, final Pattern pattern) {
54
		final Bson dateFilter = dateQuery(from, until);
55
		final Bson regexFilter = regexQuery(pattern);
56
		if (dateFilter != null & regexFilter != null) {
57
			return and(dateFilter, regexFilter);
58
		} else if (dateFilter != null) {
59
			return dateFilter;
60
		} else if (regexFilter != null) {
61
			return regexFilter;
62
		}
63
		return new BasicDBObject();
64
	}
65

  
66
	private Bson dateQuery(final Long from, final Long until) {
67
		if (from != null & until != null) {
68
			return and(gt("timestamp", from), lt("timestamp", until));
69
		}
70
		if (from != null) {
71
			return gt("timestamp", from);
72
		}
73
		if (until != null) {
74
			return lt("timestamp", until);
75
		}
76
		return null;
77
	}
78

  
79
	private Bson regexQuery(final Pattern pattern) {
80
		if (pattern != null) {
81
			return regex("body", pattern);
82
		}
83
		return null;
84
	}
85

  
56 86
	@Override
57 87
	public boolean hasNext() {
58 88
		return currentCursor.hasNext();

Also available in: Unified diff