Revision 46851
Added by Claudio Atzori about 7 years ago
MongoResultSetListener.java | ||
---|---|---|
3 | 3 |
import java.util.function.Function; |
4 | 4 |
import java.util.regex.Pattern; |
5 | 5 |
|
6 |
import com.mongodb.BasicDBObject; |
|
6 | 7 |
import com.mongodb.DBObject; |
7 | 8 |
import com.mongodb.client.MongoCollection; |
8 | 9 |
import com.mongodb.client.MongoCursor; |
9 |
import com.mongodb.client.model.Filters; |
|
10 | 10 |
import com.mongodb.client.model.Sorts; |
11 | 11 |
import eu.dnetlib.enabling.resultset.listener.ResultSetListener; |
12 | 12 |
import eu.dnetlib.rmi.common.ResultSetException; |
... | ... | |
15 | 15 |
import org.apache.commons.logging.LogFactory; |
16 | 16 |
import org.bson.conversions.Bson; |
17 | 17 |
|
18 |
import static com.mongodb.client.model.Filters.*; |
|
19 |
|
|
18 | 20 |
public class MongoResultSetListener implements ResultSetListener<String> { |
19 | 21 |
|
20 | 22 |
private static final Log log = LogFactory.getLog(MongoResultSetListener.class); |
23 |
|
|
21 | 24 |
private MongoCursor<DBObject> currentCursor; |
22 | 25 |
private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id")); |
23 | 26 |
private Function<DBObject, String> serializer; |
24 |
private Pattern filter; |
|
27 |
|
|
25 | 28 |
private MongoCollection<DBObject> collection; |
26 | 29 |
private int total = 0; |
27 | 30 |
private int count = 0; |
28 | 31 |
|
29 |
public MongoResultSetListener(final MongoCollection<DBObject> collection, final Pattern filter, final Function<DBObject, String> serializer, final boolean noCursorTimeout) |
|
32 |
private Bson query; |
|
33 |
|
|
34 |
public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer, final int batchSize) |
|
30 | 35 |
throws MDStoreServiceException { |
31 |
log.debug("Creating resultset - noCursorTimeout: "+noCursorTimeout); |
|
32 | 36 |
this.collection = collection; |
33 |
this.filter = filter; |
|
34 | 37 |
this.serializer = serializer; |
35 |
createCursor(noCursorTimeout); |
|
36 |
if (currentCursor == null) |
|
37 |
throw new MDStoreServiceException("Error on creating resultset the iterator is null"); |
|
38 |
|
|
38 |
this.batchSize = batchSize; |
|
39 |
this.query = query(from, until, filter); |
|
40 |
createCursor(); |
|
39 | 41 |
} |
40 | 42 |
|
41 | 43 |
private void createCursor(final boolean noCursorTimeout) throws MDStoreServiceException { |
44 |
log.debug("init mdstore cursor using query " + query); |
|
42 | 45 |
try { |
43 |
if (filter != null) { |
|
44 |
Bson query = Filters.regex("body", filter); |
|
45 |
total = (int) collection.count(query); |
|
46 |
currentCursor = collection.find(query).noCursorTimeout(noCursorTimeout).sort(sortByIdAsc).iterator(); |
|
47 |
} else { |
|
48 |
total = (int) collection.count(); |
|
49 |
currentCursor = collection.find().noCursorTimeout(noCursorTimeout).sort(sortByIdAsc).iterator(); |
|
50 |
} |
|
46 |
total = (int) collection.count(query); |
|
47 |
currentCursor = collection.find(query).batchSize(batchSize).sort(sortByIdAsc).iterator(); |
|
51 | 48 |
} catch (Throwable e) { |
52 |
throw new MDStoreServiceException("Error on initialize the cursor"); |
|
49 |
throw new MDStoreServiceException("Error on initialize the Mongodb cursor");
|
|
53 | 50 |
} |
54 | 51 |
} |
55 | 52 |
|
53 |
private Bson query(final Long from, final Long until, final Pattern pattern) { |
|
54 |
final Bson dateFilter = dateQuery(from, until); |
|
55 |
final Bson regexFilter = regexQuery(pattern); |
|
56 |
if (dateFilter != null & regexFilter != null) { |
|
57 |
return and(dateFilter, regexFilter); |
|
58 |
} else if (dateFilter != null) { |
|
59 |
return dateFilter; |
|
60 |
} else if (regexFilter != null) { |
|
61 |
return regexFilter; |
|
62 |
} |
|
63 |
return new BasicDBObject(); |
|
64 |
} |
|
65 |
|
|
66 |
private Bson dateQuery(final Long from, final Long until) { |
|
67 |
if (from != null & until != null) { |
|
68 |
return and(gt("timestamp", from), lt("timestamp", until)); |
|
69 |
} |
|
70 |
if (from != null) { |
|
71 |
return gt("timestamp", from); |
|
72 |
} |
|
73 |
if (until != null) { |
|
74 |
return lt("timestamp", until); |
|
75 |
} |
|
76 |
return null; |
|
77 |
} |
|
78 |
|
|
79 |
private Bson regexQuery(final Pattern pattern) { |
|
80 |
if (pattern != null) { |
|
81 |
return regex("body", pattern); |
|
82 |
} |
|
83 |
return null; |
|
84 |
} |
|
85 |
|
|
56 | 86 |
@Override |
57 | 87 |
public boolean hasNext() { |
58 | 88 |
return currentCursor.hasNext(); |
Also available in: Unified diff
implemented use of from/until for incremental record delivery