1 |
1 |
package eu.dnetlib.data.mdstore.modular.mongodb;
|
2 |
2 |
|
3 |
|
import java.util.*;
|
4 |
|
import java.util.concurrent.*;
|
|
3 |
import java.util.ArrayList;
|
|
4 |
import java.util.List;
|
|
5 |
import java.util.Map;
|
|
6 |
import java.util.Set;
|
|
7 |
import java.util.concurrent.ArrayBlockingQueue;
|
|
8 |
import java.util.concurrent.BlockingQueue;
|
|
9 |
import java.util.concurrent.Callable;
|
|
10 |
import java.util.concurrent.ExecutionException;
|
|
11 |
import java.util.concurrent.ExecutorService;
|
|
12 |
import java.util.concurrent.Executors;
|
|
13 |
import java.util.concurrent.Future;
|
|
14 |
import java.util.concurrent.TimeUnit;
|
5 |
15 |
import java.util.regex.Pattern;
|
6 |
16 |
|
|
17 |
import org.apache.commons.lang3.StringUtils;
|
|
18 |
import org.apache.commons.logging.Log;
|
|
19 |
import org.apache.commons.logging.LogFactory;
|
|
20 |
import org.bson.conversions.Bson;
|
|
21 |
import org.springframework.beans.factory.annotation.Required;
|
|
22 |
|
7 |
23 |
import com.google.common.base.Function;
|
8 |
24 |
import com.google.common.collect.Iterables;
|
9 |
25 |
import com.google.common.collect.Iterators;
|
... | ... | |
17 |
33 |
import com.mongodb.client.MongoCollection;
|
18 |
34 |
import com.mongodb.client.MongoDatabase;
|
19 |
35 |
import com.mongodb.client.model.IndexOptions;
|
|
36 |
|
20 |
37 |
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
|
21 |
38 |
import eu.dnetlib.data.mdstore.MDStoreServiceException;
|
22 |
39 |
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
|
23 |
40 |
import eu.dnetlib.data.mdstore.modular.RecordParser;
|
24 |
41 |
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
|
25 |
42 |
import eu.dnetlib.enabling.resultset.ResultSetListener;
|
26 |
|
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
|
27 |
|
import org.apache.commons.lang3.StringUtils;
|
28 |
|
import org.apache.commons.logging.Log;
|
29 |
|
import org.apache.commons.logging.LogFactory;
|
30 |
|
import org.bson.conversions.Bson;
|
31 |
|
import org.springframework.beans.factory.annotation.Required;
|
32 |
43 |
|
33 |
44 |
public class MongoMDStore implements MDStore {
|
34 |
45 |
|
... | ... | |
37 |
48 |
|
38 |
49 |
private final boolean discardRecords;
|
39 |
50 |
private String id;
|
40 |
|
private MongoDatabase mongoDatabase;
|
|
51 |
private final MongoDatabase mongoDatabase;
|
41 |
52 |
private MongoCollection<DBObject> collection;
|
42 |
53 |
private MongoCollection<DBObject> discardedCollection;
|
43 |
54 |
private List<MDFormatDescription> mdformats;
|
... | ... | |
48 |
59 |
private final IndexOptions options = new IndexOptions().background(true);
|
49 |
60 |
|
50 |
61 |
public MongoMDStore(final String id,
|
51 |
|
final MongoCollection<DBObject> collection,
|
52 |
|
final RecordParser recordParser,
|
53 |
|
final boolean discardRecords,
|
54 |
|
final MongoDatabase mongoDatabase) {
|
|
62 |
final MongoCollection<DBObject> collection,
|
|
63 |
final RecordParser recordParser,
|
|
64 |
final boolean discardRecords,
|
|
65 |
final MongoDatabase mongoDatabase) {
|
55 |
66 |
this.id = id;
|
56 |
67 |
this.mongoDatabase = mongoDatabase;
|
57 |
68 |
this.collection = collection;
|
... | ... | |
77 |
88 |
int countStored = 0;
|
78 |
89 |
final Callable<Integer> writer = () -> {
|
79 |
90 |
final MongoBulkWritesManager bulkWritesManager =
|
80 |
|
new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
|
|
91 |
new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
|
81 |
92 |
int count = 0;
|
82 |
93 |
while (true) {
|
83 |
94 |
try {
|
84 |
|
final Object record = queue.take();
|
|
95 |
final Object record = queue.poll(1, TimeUnit.HOURS);
|
|
96 |
if (record == null) {
|
|
97 |
log.fatal("retrieved a null object, probably the feeding is failed");
|
|
98 |
throw new IllegalStateException("retrieved a null object, probably the feeding wf is failed");
|
|
99 |
}
|
85 |
100 |
if (record.equals(sentinel)) {
|
86 |
101 |
bulkWritesManager.flushBulks();
|
87 |
102 |
break;
|
... | ... | |
97 |
112 |
return count;
|
98 |
113 |
};
|
99 |
114 |
final ExecutorService executorService = Executors.newSingleThreadExecutor();
|
100 |
|
Future<Integer> storedCountInt = executorService.submit(writer);
|
|
115 |
final Future<Integer> storedCountInt = executorService.submit(writer);
|
101 |
116 |
|
102 |
117 |
try {
|
103 |
118 |
log.info("feeding mdstore " + id);
|
104 |
119 |
if (records != null) {
|
105 |
120 |
for (final String record : records) {
|
106 |
|
queue.put(record);
|
|
121 |
if (record != null) {
|
|
122 |
queue.put(record);
|
|
123 |
}
|
107 |
124 |
}
|
108 |
125 |
}
|
109 |
126 |
queue.put(sentinel);
|
... | ... | |
145 |
162 |
*/
|
146 |
163 |
public void replace(final String grep, final String replace) {
|
147 |
164 |
final Pattern regex = Pattern.compile(grep, Pattern.MULTILINE);
|
148 |
|
BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(regex).get();
|
|
165 |
final BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(regex).get();
|
149 |
166 |
final FindIterable<DBObject> matches = collection.find(query, DBObject.class);
|
150 |
|
//final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
|
151 |
|
if (log.isDebugEnabled())
|
|
167 |
// final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
|
|
168 |
if (log.isDebugEnabled()) {
|
152 |
169 |
log.debug("FOUND: " + Lists.newArrayList(matches).size());
|
|
170 |
}
|
153 |
171 |
|
154 |
172 |
for (final DBObject match : matches) {
|
155 |
173 |
final DBObject o = new BasicDBObject(match.toMap());
|
... | ... | |
169 |
187 |
}
|
170 |
188 |
|
171 |
189 |
public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer)
|
172 |
|
throws MDStoreServiceException {
|
173 |
|
final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
|
|
190 |
throws MDStoreServiceException {
|
|
191 |
final Pattern filter = recordFilter != null && recordFilter.length() > 0 ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
|
174 |
192 |
|
175 |
|
return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer); }
|
|
193 |
return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);
|
|
194 |
}
|
176 |
195 |
|
177 |
196 |
private Long parseLong(final String s) throws MDStoreServiceException {
|
178 |
|
if (StringUtils.isBlank(s)) {
|
179 |
|
return null;
|
180 |
|
}
|
|
197 |
if (StringUtils.isBlank(s)) { return null; }
|
181 |
198 |
try {
|
182 |
199 |
return Long.valueOf(s);
|
183 |
|
} catch (NumberFormatException e) {
|
|
200 |
} catch (final NumberFormatException e) {
|
184 |
201 |
throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
|
185 |
202 |
}
|
186 |
203 |
}
|
... | ... | |
198 |
215 |
@Override
|
199 |
216 |
public String getRecord(final String recordId) throws DocumentNotFoundException {
|
200 |
217 |
final DBObject obj = collection.find(new BasicDBObject("id", recordId)).first();
|
201 |
|
if (obj == null || !obj.containsField("body")) throw new DocumentNotFoundException(String.format(
|
202 |
|
"The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
|
|
218 |
if (obj == null || !obj.containsField("body")) {
|
|
219 |
throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
|
|
220 |
}
|
203 |
221 |
final String body = (String) obj.get("body");
|
204 |
|
if (body.trim().length() == 0) throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'",
|
205 |
|
recordId, id));
|
|
222 |
if (body.trim().length() == 0) {
|
|
223 |
throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
|
|
224 |
}
|
206 |
225 |
return new SerializeMongoRecord().apply(obj);
|
207 |
226 |
}
|
208 |
227 |
|
... | ... | |
210 |
229 |
public List<String> deliver(final String mdId, final int pageSize, final int offset, final Map<String, String> queryParam) {
|
211 |
230 |
final QueryBuilder query = QueryBuilder.start();
|
212 |
231 |
|
213 |
|
for (String key : queryParam.keySet()) {
|
|
232 |
for (final String key : queryParam.keySet()) {
|
214 |
233 |
query.and(key).regex(Pattern.compile(queryParam.get(key), Pattern.LITERAL));
|
215 |
234 |
}
|
216 |
235 |
|
217 |
|
FindIterable<DBObject> dbObjects = offset > 0
|
218 |
|
? collection.find((Bson) query.get()).limit(pageSize).skip(offset)
|
219 |
|
: collection.find((Bson) query.get()).limit(pageSize);
|
|
236 |
final FindIterable<DBObject> dbObjects = offset > 0
|
|
237 |
? collection.find((Bson) query.get()).limit(pageSize).skip(offset)
|
|
238 |
: collection.find((Bson) query.get()).limit(pageSize);
|
220 |
239 |
|
221 |
|
|
222 |
240 |
queryParam.put("count", "" + collection.count((Bson) query.get()));
|
223 |
241 |
|
224 |
242 |
final List<String> result = new ArrayList<>();
|
fixed a problem with blocked threads