Project

General

Profile

1 41825 sandro.lab
package eu.dnetlib.data.mdstore.modular.mongodb;
2
3
import java.util.function.Function;
4
import java.util.regex.Pattern;
5
6 46850 claudio.at
import com.mongodb.BasicDBObject;
7 41825 sandro.lab
import com.mongodb.DBObject;
8
import com.mongodb.client.MongoCollection;
9
import com.mongodb.client.MongoCursor;
10
import com.mongodb.client.model.Sorts;
11
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
12
import eu.dnetlib.rmi.common.ResultSetException;
13
import eu.dnetlib.rmi.data.MDStoreServiceException;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.bson.conversions.Bson;
17
18 46850 claudio.at
import static com.mongodb.client.model.Filters.*;
19
20 41825 sandro.lab
public class MongoResultSetListener implements ResultSetListener<String> {
21
22
	private static final Log log = LogFactory.getLog(MongoResultSetListener.class);
23 46850 claudio.at
24 41825 sandro.lab
	private MongoCursor<DBObject> currentCursor;
25
	private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id"));
26
	private Function<DBObject, String> serializer;
27 46850 claudio.at
28 41825 sandro.lab
	private MongoCollection<DBObject> collection;
29
	private int total = 0;
30
	private int count = 0;
31 45885 claudio.at
	private int batchSize;
32 41825 sandro.lab
33 46850 claudio.at
	private Bson query;
34
35
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer, final int batchSize)
36 41825 sandro.lab
			throws MDStoreServiceException {
37
		this.collection = collection;
38
		this.serializer = serializer;
39 45885 claudio.at
		this.batchSize = batchSize;
40 46850 claudio.at
		this.query = query(from, until, filter);
41 41825 sandro.lab
		createCursor();
42
	}
43
44
	private void createCursor() throws MDStoreServiceException {
45 46850 claudio.at
		log.debug("init mdstore cursor using query " + query);
46 41825 sandro.lab
		try {
47 46850 claudio.at
			total = (int) collection.count(query);
48 53647 sandro.lab
//			currentCursor = collection.find(query).batchSize(batchSize).sort(sortByIdAsc).noCursorTimeout(true).iterator();
49
            currentCursor = collection.find(query).batchSize(batchSize).noCursorTimeout(true).iterator();
50 41825 sandro.lab
		} catch (Throwable e) {
51 46850 claudio.at
			throw new MDStoreServiceException("Error on initialize the Mongodb cursor");
52 41825 sandro.lab
		}
53
	}
54
55 46850 claudio.at
	private Bson query(final Long from, final Long until, final Pattern pattern) {
56
		final Bson dateFilter = dateQuery(from, until);
57
		final Bson regexFilter = regexQuery(pattern);
58
		if (dateFilter != null & regexFilter != null) {
59
			return and(dateFilter, regexFilter);
60
		} else if (dateFilter != null) {
61
			return dateFilter;
62
		} else if (regexFilter != null) {
63
			return regexFilter;
64
		}
65
		return new BasicDBObject();
66
	}
67
68
	private Bson dateQuery(final Long from, final Long until) {
69
		if (from != null & until != null) {
70
			return and(gt("timestamp", from), lt("timestamp", until));
71
		}
72
		if (from != null) {
73
			return gt("timestamp", from);
74
		}
75
		if (until != null) {
76
			return lt("timestamp", until);
77
		}
78
		return null;
79
	}
80
81
	private Bson regexQuery(final Pattern pattern) {
82
		if (pattern != null) {
83
			return regex("body", pattern);
84
		}
85
		return null;
86
	}
87
88 41825 sandro.lab
	@Override
89
	public boolean hasNext() {
90
		return currentCursor.hasNext();
91
	}
92
93
	@Override
94
	public String next() throws ResultSetException {
95
		count++;
96
		return serializer.apply(currentCursor.next());
97
	}
98
99
	@Override
100
	public int getCount() {
101
		return count;
102
	}
103
104
	@Override
105
	public int getTotal() {
106
		return total;
107
	}
108
}