Revision 47863
Added by Claudio Atzori almost 7 years ago
MongoResultSetListener.java | ||
---|---|---|
9 | 9 |
import com.google.common.collect.Lists; |
10 | 10 |
import com.mongodb.BasicDBObject; |
11 | 11 |
import com.mongodb.DBObject; |
12 |
import com.mongodb.QueryBuilder; |
|
13 | 12 |
import com.mongodb.client.FindIterable; |
14 | 13 |
import com.mongodb.client.MongoCollection; |
15 |
import com.mongodb.client.model.Filters; |
|
16 | 14 |
import com.mongodb.client.model.Sorts; |
17 | 15 |
import eu.dnetlib.enabling.resultset.ResultSet; |
18 | 16 |
import eu.dnetlib.enabling.resultset.ResultSetAware; |
... | ... | |
28 | 26 |
|
29 | 27 |
private static final Log log = LogFactory.getLog(MongoResultSetListener.class); |
30 | 28 |
|
31 |
private ConcurrentSizedMap<Integer, String> lastKeys = new ConcurrentSizedMap<Integer, String>();
|
|
29 |
private ConcurrentSizedMap<Integer, String> lastKeys = new ConcurrentSizedMap<>(); |
|
32 | 30 |
private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id")); |
33 | 31 |
|
34 | 32 |
private Function<DBObject, String> serializer; |
35 |
private Pattern filter; |
|
36 | 33 |
private MongoCollection<DBObject> collection; |
34 |
private Bson query; |
|
37 | 35 |
|
38 |
public MongoResultSetListener(final MongoCollection<DBObject> collection, final Pattern filter, final Function<DBObject, String> serializer) { |
|
36 |
public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer) {
|
|
39 | 37 |
this.collection = collection; |
40 |
this.filter = filter; |
|
41 | 38 |
this.serializer = serializer; |
39 |
this.query = query(from, until, filter); |
|
42 | 40 |
} |
43 | 41 |
|
44 | 42 |
@Override |
... | ... | |
66 | 64 |
} |
67 | 65 |
|
68 | 66 |
private ArrayList<DBObject> fetchNew(final int from, final int size) { |
69 |
FindIterable<DBObject> it = null; |
|
70 |
if (filter != null) { |
|
71 |
Bson query = Filters.regex("body", filter); |
|
72 |
it = collection.find(query); |
|
73 |
} else |
|
74 |
it = collection.find(); |
|
75 |
|
|
67 |
final FindIterable<DBObject> it = collection.find(query).batchSize(size); |
|
76 | 68 |
return Lists.newArrayList(it.sort(sortByIdAsc).skip(from).limit(size)); |
77 | 69 |
} |
78 | 70 |
|
... | ... | |
80 | 72 |
if (log.isDebugEnabled()) { |
81 | 73 |
log.debug("trying to continue from previous key: " + lastKey); |
82 | 74 |
} |
83 |
Bson filterQuery = gt("id", lastKey); |
|
84 |
if (filter != null) { |
|
85 |
filterQuery = and(filterQuery, regex("body", filter)); |
|
86 |
} |
|
87 |
final FindIterable<DBObject> it = collection.find(filterQuery).sort(sortByIdAsc).limit(size); |
|
75 |
final Bson q = and(query, gt("id", lastKey)); |
|
76 |
final FindIterable<DBObject> it = collection.find(q).batchSize(size).sort(sortByIdAsc).limit(size); |
|
88 | 77 |
return Lists.newArrayList(it); |
89 | 78 |
} |
90 | 79 |
|
80 |
private Bson query(final Long from, final Long until, final Pattern pattern) { |
|
81 |
final Bson dateFilter = dateQuery(from, until); |
|
82 |
final Bson regexFilter = regexQuery(pattern); |
|
83 |
if (dateFilter != null & regexFilter != null) { |
|
84 |
return and(dateFilter, regexFilter); |
|
85 |
} else if (dateFilter != null) { |
|
86 |
return dateFilter; |
|
87 |
} else if (regexFilter != null) { |
|
88 |
return regexFilter; |
|
89 |
} |
|
90 |
return new BasicDBObject(); |
|
91 |
} |
|
92 |
|
|
93 |
private Bson dateQuery(final Long from, final Long until) { |
|
94 |
if (from != null & until != null) { |
|
95 |
return and(gt("timestamp", from), lt("timestamp", until)); |
|
96 |
} |
|
97 |
if (from != null) { |
|
98 |
return gt("timestamp", from); |
|
99 |
} |
|
100 |
if (until != null) { |
|
101 |
return lt("timestamp", until); |
|
102 |
} |
|
103 |
return null; |
|
104 |
} |
|
105 |
|
|
106 |
private Bson regexQuery(final Pattern pattern) { |
|
107 |
if (pattern != null) { |
|
108 |
return regex("body", pattern); |
|
109 |
} |
|
110 |
return null; |
|
111 |
} |
|
112 |
|
|
91 | 113 |
@Override |
92 | 114 |
public int getSize() { |
93 |
if (filter != null) { |
|
94 |
BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(filter).get(); |
|
95 |
return (int) collection.count(query); |
|
96 |
} |
|
97 |
return (int) collection.count(); |
|
115 |
return (int) collection.count(query); |
|
98 | 116 |
} |
99 | 117 |
|
100 | 118 |
@Override |
Also available in: Unified diff
integrated latest changes from dnet40