1
|
package eu.dnetlib.data.oai.store.mongo;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Collection;
|
5
|
import java.util.Date;
|
6
|
import java.util.List;
|
7
|
import java.util.concurrent.ArrayBlockingQueue;
|
8
|
import java.util.concurrent.BlockingQueue;
|
9
|
import java.util.concurrent.TimeUnit;
|
10
|
import java.util.zip.ZipEntry;
|
11
|
import java.util.zip.ZipOutputStream;
|
12
|
|
13
|
import com.google.common.base.Function;
|
14
|
import com.google.common.base.Predicate;
|
15
|
import com.google.common.base.Stopwatch;
|
16
|
import com.google.common.collect.Iterables;
|
17
|
import com.google.common.collect.Lists;
|
18
|
import com.google.common.collect.Multimap;
|
19
|
import com.mongodb.BasicDBObject;
|
20
|
import com.mongodb.BasicDBObjectBuilder;
|
21
|
import com.mongodb.DBObject;
|
22
|
import com.mongodb.WriteConcern;
|
23
|
import com.mongodb.client.FindIterable;
|
24
|
import com.mongodb.client.ListIndexesIterable;
|
25
|
import com.mongodb.client.MongoCollection;
|
26
|
import com.mongodb.client.MongoDatabase;
|
27
|
import com.mongodb.client.model.Filters;
|
28
|
import com.mongodb.client.model.IndexOptions;
|
29
|
import com.mongodb.client.model.Sorts;
|
30
|
import com.mongodb.client.model.UpdateOptions;
|
31
|
import com.mongodb.client.result.DeleteResult;
|
32
|
import com.mongodb.client.result.UpdateResult;
|
33
|
import eu.dnetlib.data.information.oai.publisher.OaiPublisherRuntimeException;
|
34
|
import eu.dnetlib.data.information.oai.publisher.PublisherField;
|
35
|
import eu.dnetlib.data.information.oai.publisher.conf.OAIConfigurationReader;
|
36
|
import eu.dnetlib.data.information.oai.publisher.info.RecordInfo;
|
37
|
import eu.dnetlib.data.information.oai.publisher.info.SetInfo;
|
38
|
import eu.dnetlib.data.oai.store.PublisherStore;
|
39
|
import eu.dnetlib.data.oai.store.RecordChangeDetector;
|
40
|
import eu.dnetlib.data.oai.store.parser.PublisherRecordParser;
|
41
|
import eu.dnetlib.data.oai.store.sets.MongoSetCollection;
|
42
|
import eu.dnetlib.functionality.cql.mongo.MongoCqlTranslator;
|
43
|
import eu.dnetlib.miscutils.functional.UnaryFunction;
|
44
|
import org.apache.commons.io.output.ByteArrayOutputStream;
|
45
|
import org.apache.commons.lang3.StringUtils;
|
46
|
import org.apache.commons.logging.Log;
|
47
|
import org.apache.commons.logging.LogFactory;
|
48
|
import org.bson.conversions.Bson;
|
49
|
import org.bson.types.Binary;
|
50
|
|
51
|
public class MongoPublisherStore implements PublisherStore<DNetOAIMongoCursor> {
|
52
|
|
53
|
private static final Log log = LogFactory.getLog(MongoPublisherStore.class); // NOPMD by marko on 11/24/08 5:02 PM
|
54
|
|
55
|
private String id, metadataFormat, interpretation, layout;
|
56
|
/** Keeps information about the fields to be created in mongo. **/
|
57
|
private List<PublisherField> mongoFields;
|
58
|
|
59
|
private MongoCollection<DBObject> collection;
|
60
|
private MongoCollection<DBObject> discardedCollection;
|
61
|
|
62
|
private RecordInfoGenerator recordInfoGenerator;
|
63
|
private MetadataExtractor metadataExtractor;
|
64
|
|
65
|
private RecordChangeDetector recordChangeDetector;
|
66
|
|
67
|
private MongoSetCollection mongoSetCollection;
|
68
|
|
69
|
/**
|
70
|
* Used to generate the OAI identifiers compliant to the protocol. See
|
71
|
* http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
|
72
|
*/
|
73
|
private String idScheme;
|
74
|
/**
|
75
|
* Used to generate the OAI identifiers compliant to the protocol. See
|
76
|
* http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
|
77
|
*/
|
78
|
private String idNamespace;
|
79
|
|
80
|
private boolean alwaysNewRecord;
|
81
|
|
82
|
public MongoPublisherStore() {
|
83
|
super();
|
84
|
}
|
85
|
|
86
|
public MongoPublisherStore(final String id,
|
87
|
final String metadataFormat,
|
88
|
final String interpretation,
|
89
|
final String layout,
|
90
|
final MongoCollection<DBObject> collection,
|
91
|
final List<PublisherField> mongoFields,
|
92
|
final RecordInfoGenerator recordInfoGenerator,
|
93
|
final String idScheme,
|
94
|
final String idNamespace,
|
95
|
final MetadataExtractor metadataExtractor,
|
96
|
final RecordChangeDetector recordChangeDetector,
|
97
|
final boolean alwaysNewRecord,
|
98
|
final MongoDatabase mongodb) {
|
99
|
super();
|
100
|
this.id = id;
|
101
|
this.metadataFormat = metadataFormat;
|
102
|
this.interpretation = interpretation;
|
103
|
this.layout = layout;
|
104
|
this.collection = collection;
|
105
|
this.discardedCollection = mongodb.getCollection("discarded-" + collection.getNamespace().getCollectionName(), DBObject.class);
|
106
|
this.mongoFields = mongoFields;
|
107
|
this.recordInfoGenerator = recordInfoGenerator;
|
108
|
this.idScheme = idScheme;
|
109
|
this.idNamespace = idNamespace;
|
110
|
this.recordChangeDetector = recordChangeDetector;
|
111
|
this.alwaysNewRecord = alwaysNewRecord;
|
112
|
}
|
113
|
|
114
|
@Override
|
115
|
public RecordInfo getRecord(final String recordId) {
|
116
|
Bson query = Filters.eq(OAIConfigurationReader.ID_FIELD, recordId);
|
117
|
DBObject result = this.collection.find(query).first();
|
118
|
log.debug(result);
|
119
|
return this.recordInfoGenerator.transformDBObject(result, true);
|
120
|
}
|
121
|
|
122
|
@Override
|
123
|
public RecordInfo getRecord(final String recordId, final UnaryFunction<String, String> unaryFunction) {
|
124
|
RecordInfo result = this.getRecord(recordId);
|
125
|
if (result != null) {
|
126
|
String transformedBody = unaryFunction.evaluate(result.getMetadata());
|
127
|
result.setMetadata(transformedBody);
|
128
|
}
|
129
|
return result;
|
130
|
}
|
131
|
|
132
|
@Override
|
133
|
public DNetOAIMongoCursor getRecords(final String queryString, final boolean bodyIncluded, final int limit) {
|
134
|
FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
|
135
|
return new DNetOAIMongoCursor(iter.iterator(), bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
|
136
|
}
|
137
|
|
138
|
@Override
|
139
|
public DNetOAIMongoCursor getRecords(final String queryString,
|
140
|
final UnaryFunction<String, String> unaryFunction,
|
141
|
final boolean bodyIncluded,
|
142
|
final int limit) {
|
143
|
FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
|
144
|
return new DNetOAIMongoCursor(iter.iterator(), unaryFunction, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
|
145
|
}
|
146
|
|
147
|
private FindIterable<DBObject> loggedFindByQuery(final String queryString, final int limit) {
|
148
|
final Bson query = parseQuery(queryString);
|
149
|
long start = System.currentTimeMillis();
|
150
|
Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("_id"));
|
151
|
FindIterable<DBObject> iter = this.collection.find(query).sort(sortByIdAsc).limit(limit);
|
152
|
long end = System.currentTimeMillis();
|
153
|
log.debug("Query:" + query + "\ntime to get mongo iterable (ms): " + (end - start));
|
154
|
return iter;
|
155
|
}
|
156
|
|
157
|
private Bson parseQuery(final String query) {
|
158
|
try {
|
159
|
return MongoCqlTranslator.toMongo(query);
|
160
|
} catch(Exception e ) {
|
161
|
throw new OaiPublisherRuntimeException(e);
|
162
|
}
|
163
|
}
|
164
|
|
165
|
@Override
|
166
|
public List<PublisherField> getIndices() {
|
167
|
return this.mongoFields;
|
168
|
}
|
169
|
|
170
|
/**
|
171
|
* <p>
|
172
|
* Ensure indices on the configuration-defined fields and on the system fields DATESTAMP_FIELD and LAST_COLLECTION_DATE_FIELD.
|
173
|
* <p>
|
174
|
* <p>
|
175
|
* Note that by default ID_FIELD, SET_FIELD, DELETED_FIELD, BODY_FIELD, UPDATED_FIELD are not indexed. If you want an index on those,
|
176
|
* then you have to specify it in the configuration file of the OAI Publisher: <br>
|
177
|
* <INDEX name="deleted">
|
178
|
* </p>
|
179
|
*
|
180
|
* {@inheritDoc}
|
181
|
*
|
182
|
*/
|
183
|
@Override
|
184
|
public void ensureIndices() {
|
185
|
final ListIndexesIterable<BasicDBObject> indexesIterable = this.collection.listIndexes(BasicDBObject.class);
|
186
|
final IndexOptions indexOptions = new IndexOptions().background(true);
|
187
|
Stopwatch sw = Stopwatch.createUnstarted();
|
188
|
sw.start();
|
189
|
// I want to keep the composite indexes that might have been defined manually
|
190
|
// I DO NOT NEED TO DO THIS. If the indexes are there, they are there.
|
191
|
/*log.debug("Ensuring currently defined composite indexes on store "+id+": ");
|
192
|
for (BasicDBObject o : indexesIterable) {
|
193
|
BasicDBObject fieldIndexed = (BasicDBObject) o.get("key");
|
194
|
if (fieldIndexed.keySet().size() > 1) {
|
195
|
log.debug(o);
|
196
|
this.collection.createIndex(fieldIndexed, indexOptions);
|
197
|
}
|
198
|
}*/
|
199
|
// Indexes on single fields.
|
200
|
for (PublisherField field : this.mongoFields) {
|
201
|
BasicDBObject mongoIdx = new BasicDBObject(field.getFieldName(), 1);
|
202
|
log.debug("Creating index on store "+id+" : " + mongoIdx);
|
203
|
this.collection.createIndex(mongoIdx, indexOptions);
|
204
|
}
|
205
|
log.debug("Creating index over : " + OAIConfigurationReader.DATESTAMP_FIELD);
|
206
|
this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.DATESTAMP_FIELD, 1), indexOptions);
|
207
|
log.debug("Creating index over : " + OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD);
|
208
|
this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, 1), indexOptions);
|
209
|
sw.stop();
|
210
|
log.info("All indexes have been updated in " + sw.elapsed(TimeUnit.MILLISECONDS) + " milliseconds");
|
211
|
}
|
212
|
|
213
|
/**
|
214
|
* Creates a compound index over the specified fields on the given store.
|
215
|
* <p>
|
216
|
* The creation is performed on the background
|
217
|
* </p>
|
218
|
*
|
219
|
* @param fieldNames
|
220
|
* List of fields to be included in the compound index
|
221
|
* @theStore MongoPublisherStore where to create the index
|
222
|
*/
|
223
|
public void createCompoundIndex(final List<String> fieldNames) {
|
224
|
if ((fieldNames == null) || fieldNames.isEmpty()) {
|
225
|
log.fatal("No fields specified for the creation of the compound index");
|
226
|
}
|
227
|
BasicDBObjectBuilder theIndexBuilder = BasicDBObjectBuilder.start();
|
228
|
for (String f : fieldNames) {
|
229
|
theIndexBuilder.add(f, 1);
|
230
|
}
|
231
|
BasicDBObject theIndex = (BasicDBObject) theIndexBuilder.get();
|
232
|
log.info("Creating index " + theIndex + " on " + this.getId());
|
233
|
this.getCollection().createIndex(theIndex, new IndexOptions().background(true));
|
234
|
}
|
235
|
|
236
|
private void dropDiscarded(final String source) {
|
237
|
if (StringUtils.isBlank(source)) {
|
238
|
log.debug("Dropping discarded records from publisherStore " + id);
|
239
|
discardedCollection.drop();
|
240
|
} else {
|
241
|
log.debug("Dropping discarded records for source " + source + " from publisherStore " + id);
|
242
|
discardedCollection.deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, source));
|
243
|
}
|
244
|
}
|
245
|
|
246
|
@Override
|
247
|
public int feed(final Iterable<String> records, final String source) {
|
248
|
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(80);
|
249
|
final Object sentinel = new Object();
|
250
|
this.dropDiscarded(source);
|
251
|
final Date feedDate = new Date();
|
252
|
Thread background = new Thread(new Runnable() {
|
253
|
|
254
|
@Override
|
255
|
public void run() {
|
256
|
//For fast feeding we want to use a collection with unack write concern
|
257
|
final MongoCollection<DBObject> unackCollection = collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED);
|
258
|
while (true) {
|
259
|
try {
|
260
|
Object record = queue.take();
|
261
|
if (record == sentinel) {
|
262
|
break;
|
263
|
}
|
264
|
safeFeedRecord((String) record, source, feedDate, unackCollection);
|
265
|
} catch (InterruptedException e) {
|
266
|
log.fatal("got exception in background thread", e);
|
267
|
throw new IllegalStateException(e);
|
268
|
}
|
269
|
}
|
270
|
}
|
271
|
});
|
272
|
background.start();
|
273
|
long startFeed = feedDate.getTime();
|
274
|
try {
|
275
|
log.info("feeding publisherStore " + id);
|
276
|
for (final String record : records) {
|
277
|
queue.put(record);
|
278
|
}
|
279
|
queue.put(sentinel);
|
280
|
log.info("finished feeding publisherStore " + id);
|
281
|
|
282
|
background.join();
|
283
|
} catch (InterruptedException e) {
|
284
|
throw new IllegalStateException(e);
|
285
|
}
|
286
|
long endFeed = System.currentTimeMillis();
|
287
|
log.fatal("OAI STORE " + id + " FEEDING COMPLETED IN " + (endFeed - startFeed) + "ms");
|
288
|
this.setDeletedFlags(feedDate, source);
|
289
|
return this.count();
|
290
|
}
|
291
|
|
292
|
/**
|
293
|
* Launches the thread that flags the records to be considered as 'deleted'.
|
294
|
* <p>
|
295
|
* The datestamp of the deleted records must be updated as well, according to the OAI specs available at
|
296
|
* http://www.openarchives.org/OAI/openarchivesprotocol.html#DeletedRecords: if a repository does keep track of deletions then the
|
297
|
* datestamp of the deleted record must be the date and time that it was deleted.
|
298
|
* </p>
|
299
|
*
|
300
|
* @param feedDate
|
301
|
* @param source
|
302
|
*/
|
303
|
private void setDeletedFlags(final Date feedDate, final String source) {
|
304
|
//get the collection with ACKNOWLEDGE Write concern
|
305
|
final MongoCollection<DBObject> ackCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
306
|
Thread deletedSetter = new Thread(new Runnable() {
|
307
|
|
308
|
@Override
|
309
|
public void run() {
|
310
|
Bson filter = Filters.and(Filters.eq(OAIConfigurationReader.DELETED_FIELD, false),
|
311
|
Filters.lt(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate));
|
312
|
if (!StringUtils.isBlank(source)) {
|
313
|
filter = Filters.and(filter, Filters.eq(OAIConfigurationReader.SET_FIELD, source));
|
314
|
}
|
315
|
log.debug("Delete flag query: " + filter);
|
316
|
BasicDBObject update = new BasicDBObject("$set",
|
317
|
BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true).append(OAIConfigurationReader.DATESTAMP_FIELD, feedDate)
|
318
|
.append(OAIConfigurationReader.UPDATED_FIELD, true).get());
|
319
|
log.debug("Updating as: " + update.toString());
|
320
|
final UpdateResult updateResult = ackCollection.updateMany(filter, update, new UpdateOptions().upsert(false));
|
321
|
log.debug("Deleted flags set for source: " + source + " #records = " + updateResult.getModifiedCount());
|
322
|
}
|
323
|
});
|
324
|
|
325
|
deletedSetter.start();
|
326
|
try {
|
327
|
deletedSetter.join();
|
328
|
} catch (InterruptedException e) {
|
329
|
throw new IllegalStateException(e);
|
330
|
}
|
331
|
}
|
332
|
|
333
|
@Override
|
334
|
public void drop() {
|
335
|
this.collection.drop();
|
336
|
}
|
337
|
|
338
|
@Override
|
339
|
public void drop(final String queryString) {
|
340
|
Bson query = parseQuery(queryString);
|
341
|
final DeleteResult deleteResult = this.collection.deleteMany(query);
|
342
|
log.debug("Deleted by query: " + queryString + " #deleted: " + deleteResult.getDeletedCount());
|
343
|
|
344
|
}
|
345
|
|
346
|
@Override
|
347
|
public int count() {
|
348
|
return (int) this.collection.count();
|
349
|
}
|
350
|
|
351
|
@Override
|
352
|
public int count(final String queryString) {
|
353
|
if (StringUtils.isBlank(queryString)) return (int) this.collection.count();
|
354
|
Bson query = parseQuery(queryString);
|
355
|
return (int) this.collection.count(query);
|
356
|
}
|
357
|
|
358
|
public List<String> getDistinctSetNamesFromRecords() {
|
359
|
log.info("Going to ask for all distinct sets in the oaistore " + id + ": this may take a long time...");
|
360
|
return Lists.newArrayList(this.collection.distinct(OAIConfigurationReader.SET_FIELD, String.class));
|
361
|
}
|
362
|
|
363
|
// ***********************************************************************************************//
|
364
|
// Feed utilities
|
365
|
// ***********************************************************************************************//
|
366
|
private boolean safeFeedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
|
367
|
try {
|
368
|
if (!record.isEmpty()) return feedRecord(record, source, feedDate, unackCollection);
|
369
|
} catch (final Throwable e) {
|
370
|
log.error("Got unhandled exception while parsing record", e);
|
371
|
discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record));
|
372
|
}
|
373
|
return false;
|
374
|
}
|
375
|
|
376
|
/**
|
377
|
* Feed the record to the store.
|
378
|
*
|
379
|
* @return true if the record is new, false otherwise
|
380
|
*/
|
381
|
private boolean feedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
|
382
|
PublisherRecordParser parser = new PublisherRecordParser(this.mongoFields);
|
383
|
final Multimap<String, String> recordProperties = parser.parseRecord(record, source);
|
384
|
String id = "";
|
385
|
String oaiID = "";
|
386
|
if (recordProperties.containsKey(OAIConfigurationReader.ID_FIELD)) {
|
387
|
id = recordProperties.get(OAIConfigurationReader.ID_FIELD).iterator().next();
|
388
|
oaiID = getOAIIdentifier(id);
|
389
|
if (isNewRecord(oaiID)) {
|
390
|
feedNew(oaiID, record, recordProperties, feedDate, unackCollection);
|
391
|
return true;
|
392
|
} else {
|
393
|
if (isChanged(oaiID, record)) {
|
394
|
updateRecord(oaiID, record, recordProperties, feedDate, unackCollection);
|
395
|
} else {
|
396
|
// it is not changed, I only have to update the last collection date
|
397
|
handleRecord(oaiID, feedDate, unackCollection);
|
398
|
}
|
399
|
}
|
400
|
} else {
|
401
|
log.error("parsed record seems invalid -- no identifier property with name: " + OAIConfigurationReader.ID_FIELD);
|
402
|
discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append(
|
403
|
OAIConfigurationReader.DATESTAMP_FIELD, feedDate));
|
404
|
}
|
405
|
return false;
|
406
|
}
|
407
|
|
408
|
private BasicDBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties) {
|
409
|
BasicDBObject obj = new BasicDBObject();
|
410
|
for (final String key : recordProperties.keySet()) {
|
411
|
if (key.equals(OAIConfigurationReader.ID_FIELD)) {
|
412
|
obj.put(key, oaiID);
|
413
|
} else {
|
414
|
Collection<String> values = recordProperties.get(key);
|
415
|
if (key.equals(OAIConfigurationReader.SET_FIELD)) {
|
416
|
|
417
|
Iterable<String> setSpecs = Iterables.transform(values, new Function<String, String>() {
|
418
|
|
419
|
@Override
|
420
|
public String apply(final String s) {
|
421
|
return mongoSetCollection.normalizeSetSpec(s);
|
422
|
}
|
423
|
|
424
|
});
|
425
|
obj.put(key, setSpecs);
|
426
|
} else {
|
427
|
// let's check if the key is the name of a repeatable field or not
|
428
|
PublisherField keyField = Iterables.find(this.mongoFields, new Predicate<PublisherField>() {
|
429
|
|
430
|
@Override
|
431
|
public boolean apply(final PublisherField field) {
|
432
|
return field.getFieldName().equals(key);
|
433
|
}
|
434
|
}, null);
|
435
|
if (keyField == null) {
|
436
|
log.warn("Expected field to index: " + key + " could not be found, but we keep going...");
|
437
|
}
|
438
|
if ((keyField != null) && !keyField.isRepeatable()) {
|
439
|
if ((values != null) && !values.isEmpty()) {
|
440
|
obj.put(key, values.iterator().next());
|
441
|
}
|
442
|
} else {
|
443
|
obj.put(key, values);
|
444
|
}
|
445
|
}
|
446
|
}
|
447
|
}
|
448
|
try {
|
449
|
obj.put(OAIConfigurationReader.BODY_FIELD, createCompressRecord(record));
|
450
|
obj.put(OAIConfigurationReader.DELETED_FIELD, false);
|
451
|
return obj;
|
452
|
} catch (IOException e) {
|
453
|
throw new OaiPublisherRuntimeException(e);
|
454
|
}
|
455
|
}
|
456
|
|
457
|
/**
|
458
|
* @param record
|
459
|
* @throws IOException
|
460
|
*/
|
461
|
public Binary createCompressRecord(final String record) throws IOException {
|
462
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
463
|
ZipOutputStream zos = new ZipOutputStream(os);
|
464
|
ZipEntry entry = new ZipEntry(OAIConfigurationReader.BODY_FIELD);
|
465
|
zos.putNextEntry(entry);
|
466
|
zos.write(record.getBytes());
|
467
|
zos.closeEntry();
|
468
|
zos.flush();
|
469
|
zos.close();
|
470
|
return new Binary(os.toByteArray());
|
471
|
}
|
472
|
|
473
|
private void feedNew(final String oaiID,
|
474
|
final String record,
|
475
|
final Multimap<String, String> recordProperties,
|
476
|
final Date feedDate,
|
477
|
final MongoCollection<DBObject> unackCollection) {
|
478
|
log.debug("New record received. Assigned oai id: " + oaiID);
|
479
|
DBObject obj = this.createBasicObject(oaiID, record, recordProperties);
|
480
|
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
|
481
|
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
|
482
|
obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
|
483
|
unackCollection.insertOne(obj);
|
484
|
this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
|
485
|
}
|
486
|
|
487
|
private void updateRecord(final String oaiID,
|
488
|
final String record,
|
489
|
final Multimap<String, String> recordProperties,
|
490
|
final Date feedDate,
|
491
|
final MongoCollection<DBObject> unackCollection) {
|
492
|
log.debug("updating record " + oaiID);
|
493
|
BasicDBObject obj = this.createBasicObject(oaiID, record, recordProperties);
|
494
|
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
|
495
|
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
|
496
|
obj.put(OAIConfigurationReader.UPDATED_FIELD, true);
|
497
|
Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
|
498
|
unackCollection.replaceOne(oldObj, obj, new UpdateOptions().upsert(true));
|
499
|
this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
|
500
|
}
|
501
|
|
502
|
public void upsertSets(final Iterable<String> setNames) {
|
503
|
// feed the list of sets, if any
|
504
|
if (setNames != null) {
|
505
|
for (String setName : setNames) {
|
506
|
if (StringUtils.isNotBlank(setName)) {
|
507
|
final SetInfo set = new SetInfo();
|
508
|
String setSpec = this.mongoSetCollection.normalizeSetSpec(setName);
|
509
|
set.setSetSpec(setSpec);
|
510
|
set.setSetName(setName);
|
511
|
set.setSetDescription("This set contains metadata records whose provenance is " + setName);
|
512
|
set.setEnabled(true);
|
513
|
String query = "(" + OAIConfigurationReader.SET_FIELD + " = \"" + setSpec + "\") ";
|
514
|
set.setQuery(query);
|
515
|
this.mongoSetCollection.upsertSet(set, false, getCollection().getNamespace().getDatabaseName());
|
516
|
}
|
517
|
}
|
518
|
}
|
519
|
}
|
520
|
|
521
|
private void handleRecord(final String oaiID, final Date lastCollectionDate, final MongoCollection<DBObject> unackCollection) {
|
522
|
log.debug("handling unchanged record " + oaiID);
|
523
|
Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
|
524
|
BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, lastCollectionDate));
|
525
|
unackCollection.updateOne(oldObj, update, new UpdateOptions().upsert(true));
|
526
|
}
|
527
|
|
528
|
private boolean isNewRecord(final String oaiIdentifier) {
|
529
|
if (alwaysNewRecord || (collection.count() == 0)) return true;
|
530
|
return this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, oaiIdentifier)).first() == null;
|
531
|
}
|
532
|
|
533
|
// ***********************************************************************************************//
|
534
|
// Setters / Getters / Basic utilities
|
535
|
// ***********************************************************************************************//
|
536
|
|
537
|
private boolean isChanged(final String oaiID, final String record) {
|
538
|
RecordInfo oldRecord = getRecord(oaiID);
|
539
|
if (oldRecord == null) return StringUtils.isBlank(record);
|
540
|
return this.recordChangeDetector.differs(oldRecord.getMetadata(), record);
|
541
|
}
|
542
|
|
543
|
private String getOAIIdentifier(final String id) {
|
544
|
return this.idScheme + ":" + this.idNamespace + ":" + id;
|
545
|
}
|
546
|
|
547
|
@Override
|
548
|
public String toString() {
|
549
|
return "MongoPublisherStore{" +
|
550
|
"id='" + id + '\'' +
|
551
|
", metadataFormat='" + metadataFormat + '\'' +
|
552
|
", interpretation='" + interpretation + '\'' +
|
553
|
", layout='" + layout + '\'' +
|
554
|
", idScheme='" + idScheme + '\'' +
|
555
|
", idNamespace='" + idNamespace + '\'' +
|
556
|
", alwaysNewRecord=" + alwaysNewRecord +
|
557
|
'}';
|
558
|
}
|
559
|
|
560
|
@Override
|
561
|
public int hashCode() {
|
562
|
final int prime = 31;
|
563
|
int result = 1;
|
564
|
result = (prime * result) + ((collection == null) ? 0 : collection.hashCode());
|
565
|
result = (prime * result) + ((id == null) ? 0 : id.hashCode());
|
566
|
result = (prime * result) + ((interpretation == null) ? 0 : interpretation.hashCode());
|
567
|
result = (prime * result) + ((layout == null) ? 0 : layout.hashCode());
|
568
|
result = (prime * result) + ((metadataFormat == null) ? 0 : metadataFormat.hashCode());
|
569
|
return result;
|
570
|
}
|
571
|
|
572
|
@Override
|
573
|
public boolean equals(final Object obj) {
|
574
|
if (this == obj) return true;
|
575
|
if (obj == null) return false;
|
576
|
if (!(obj instanceof MongoPublisherStore)) return false;
|
577
|
MongoPublisherStore other = (MongoPublisherStore) obj;
|
578
|
if (collection == null) {
|
579
|
if (other.collection != null) return false;
|
580
|
} else if (!collection.equals(other.collection)) return false;
|
581
|
if (id == null) {
|
582
|
if (other.id != null) return false;
|
583
|
} else if (!id.equals(other.id)) return false;
|
584
|
if (interpretation == null) {
|
585
|
if (other.interpretation != null) return false;
|
586
|
} else if (!interpretation.equals(other.interpretation)) return false;
|
587
|
if (layout == null) {
|
588
|
if (other.layout != null) return false;
|
589
|
} else if (!layout.equals(other.layout)) return false;
|
590
|
if (metadataFormat == null) {
|
591
|
if (other.metadataFormat != null) return false;
|
592
|
} else if (!metadataFormat.equals(other.metadataFormat)) return false;
|
593
|
return true;
|
594
|
}
|
595
|
|
596
|
public MongoCollection<DBObject> getCollection() {
|
597
|
return collection;
|
598
|
}
|
599
|
|
600
|
public void setCollection(final MongoCollection<DBObject> collection) {
|
601
|
this.collection = collection;
|
602
|
}
|
603
|
|
604
|
public MongoCollection<DBObject> getDiscardedCollection() {
|
605
|
return discardedCollection;
|
606
|
}
|
607
|
|
608
|
public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
|
609
|
this.discardedCollection = discardedCollection;
|
610
|
}
|
611
|
|
612
|
public String getIdScheme() {
|
613
|
return idScheme;
|
614
|
}
|
615
|
|
616
|
public void setIdScheme(final String idScheme) {
|
617
|
this.idScheme = idScheme;
|
618
|
}
|
619
|
|
620
|
public String getIdNamespace() {
|
621
|
return idNamespace;
|
622
|
}
|
623
|
|
624
|
public void setIdNamespace(final String idNamespace) {
|
625
|
this.idNamespace = idNamespace;
|
626
|
}
|
627
|
|
628
|
public RecordInfoGenerator getRecordInfoGenerator() {
|
629
|
return recordInfoGenerator;
|
630
|
}
|
631
|
|
632
|
public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) {
|
633
|
this.recordInfoGenerator = recordInfoGenerator;
|
634
|
}
|
635
|
|
636
|
public MetadataExtractor getMetadataExtractor() {
|
637
|
return metadataExtractor;
|
638
|
}
|
639
|
|
640
|
public void setMetadataExtractor(final MetadataExtractor metadataExtractor) {
|
641
|
this.metadataExtractor = metadataExtractor;
|
642
|
}
|
643
|
|
644
|
public RecordChangeDetector getRecordChangeDetector() {
|
645
|
return recordChangeDetector;
|
646
|
}
|
647
|
|
648
|
public void setRecordChangeDetector(final RecordChangeDetector recordChangeDetector) {
|
649
|
this.recordChangeDetector = recordChangeDetector;
|
650
|
}
|
651
|
|
652
|
@Override
|
653
|
public String getId() {
|
654
|
return this.id;
|
655
|
}
|
656
|
|
657
|
public void setId(final String id) {
|
658
|
this.id = id;
|
659
|
}
|
660
|
|
661
|
@Override
|
662
|
public String getMetadataFormat() {
|
663
|
return this.metadataFormat;
|
664
|
}
|
665
|
|
666
|
public void setMetadataFormat(final String metadataFormat) {
|
667
|
this.metadataFormat = metadataFormat;
|
668
|
}
|
669
|
|
670
|
@Override
|
671
|
public String getInterpretation() {
|
672
|
return this.interpretation;
|
673
|
}
|
674
|
|
675
|
public void setInterpretation(final String interpretation) {
|
676
|
this.interpretation = interpretation;
|
677
|
}
|
678
|
|
679
|
@Override
|
680
|
public String getLayout() {
|
681
|
return this.layout;
|
682
|
}
|
683
|
|
684
|
public void setLayout(final String layout) {
|
685
|
this.layout = layout;
|
686
|
}
|
687
|
|
688
|
public MongoSetCollection getMongoSetCollection() {
|
689
|
return mongoSetCollection;
|
690
|
}
|
691
|
|
692
|
public void setMongoSetCollection(final MongoSetCollection mongoSetCollection) {
|
693
|
this.mongoSetCollection = mongoSetCollection;
|
694
|
}
|
695
|
|
696
|
public List<PublisherField> getMongoFields() {
|
697
|
return mongoFields;
|
698
|
}
|
699
|
|
700
|
public void setMongoFields(final List<PublisherField> mongoFields) {
|
701
|
this.mongoFields = mongoFields;
|
702
|
}
|
703
|
|
704
|
public boolean isAlwaysNewRecord() {
|
705
|
return alwaysNewRecord;
|
706
|
}
|
707
|
|
708
|
public void setAlwaysNewRecord(final boolean alwaysNewRecord) {
|
709
|
this.alwaysNewRecord = alwaysNewRecord;
|
710
|
}
|
711
|
|
712
|
}
|