Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.function.Function;
4
import java.util.regex.Pattern;
5

    
6
import com.mongodb.BasicDBObject;
7
import com.mongodb.DBObject;
8
import com.mongodb.client.MongoCollection;
9
import com.mongodb.client.MongoCursor;
10
import com.mongodb.client.model.Sorts;
11
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
12
import eu.dnetlib.rmi.common.ResultSetException;
13
import eu.dnetlib.rmi.data.MDStoreServiceException;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.bson.conversions.Bson;
17

    
18
import static com.mongodb.client.model.Filters.*;
19

    
20
public class MongoResultSetListener implements ResultSetListener<String> {
21

    
22
	private static final Log log = LogFactory.getLog(MongoResultSetListener.class);
23

    
24
	private MongoCursor<DBObject> currentCursor;
25
	private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id"));
26
	private Function<DBObject, String> serializer;
27

    
28
	private MongoCollection<DBObject> collection;
29
	private int total = 0;
30
	private int count = 0;
31
	private int batchSize;
32

    
33
	private Bson query;
34

    
35
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer, final int batchSize)
36
			throws MDStoreServiceException {
37
		this.collection = collection;
38
		this.serializer = serializer;
39
		this.batchSize = batchSize;
40
		this.query = query(from, until, filter);
41
		createCursor();
42
	}
43

    
44
	private void createCursor() throws MDStoreServiceException {
45
		log.debug("init mdstore cursor using query " + query);
46
		try {
47
			total = (int) collection.count(query);
48
//			currentCursor = collection.find(query).batchSize(batchSize).sort(sortByIdAsc).noCursorTimeout(true).iterator();
49
            currentCursor = collection.find(query).batchSize(batchSize).noCursorTimeout(true).iterator();
50
		} catch (Throwable e) {
51
			throw new MDStoreServiceException("Error on initialize the Mongodb cursor");
52
		}
53
	}
54

    
55
	private Bson query(final Long from, final Long until, final Pattern pattern) {
56
		final Bson dateFilter = dateQuery(from, until);
57
		final Bson regexFilter = regexQuery(pattern);
58
		if (dateFilter != null & regexFilter != null) {
59
			return and(dateFilter, regexFilter);
60
		} else if (dateFilter != null) {
61
			return dateFilter;
62
		} else if (regexFilter != null) {
63
			return regexFilter;
64
		}
65
		return new BasicDBObject();
66
	}
67

    
68
	private Bson dateQuery(final Long from, final Long until) {
69
		if (from != null & until != null) {
70
			return and(gt("timestamp", from), lt("timestamp", until));
71
		}
72
		if (from != null) {
73
			return gt("timestamp", from);
74
		}
75
		if (until != null) {
76
			return lt("timestamp", until);
77
		}
78
		return null;
79
	}
80

    
81
	private Bson regexQuery(final Pattern pattern) {
82
		if (pattern != null) {
83
			return regex("body", pattern);
84
		}
85
		return null;
86
	}
87

    
88
	@Override
89
	public boolean hasNext() {
90
		return currentCursor.hasNext();
91
	}
92

    
93
	@Override
94
	public String next() throws ResultSetException {
95
		count++;
96
		return serializer.apply(currentCursor.next());
97
	}
98

    
99
	@Override
100
	public int getCount() {
101
		return count;
102
	}
103

    
104
	@Override
105
	public int getTotal() {
106
		return total;
107
	}
108
}
(6-6/6)