1
|
package eu.dnetlib.oai.mongo;
|
2
|
|
3
|
import java.io.ByteArrayOutputStream;
|
4
|
import java.io.IOException;
|
5
|
import java.util.Collection;
|
6
|
import java.util.Date;
|
7
|
import java.util.List;
|
8
|
import java.util.concurrent.ArrayBlockingQueue;
|
9
|
import java.util.concurrent.BlockingQueue;
|
10
|
import java.util.concurrent.TimeUnit;
|
11
|
import java.util.function.Function;
|
12
|
import java.util.stream.Collectors;
|
13
|
import java.util.zip.ZipEntry;
|
14
|
import java.util.zip.ZipOutputStream;
|
15
|
|
16
|
import com.google.common.base.Stopwatch;
|
17
|
import com.google.common.collect.Iterables;
|
18
|
import com.google.common.collect.Lists;
|
19
|
import com.google.common.collect.Multimap;
|
20
|
import com.mongodb.BasicDBObject;
|
21
|
import com.mongodb.BasicDBObjectBuilder;
|
22
|
import com.mongodb.DBObject;
|
23
|
import com.mongodb.WriteConcern;
|
24
|
import com.mongodb.client.FindIterable;
|
25
|
import com.mongodb.client.MongoCollection;
|
26
|
import com.mongodb.client.MongoDatabase;
|
27
|
import com.mongodb.client.model.Filters;
|
28
|
import com.mongodb.client.model.IndexOptions;
|
29
|
import com.mongodb.client.model.Sorts;
|
30
|
import com.mongodb.client.model.UpdateOptions;
|
31
|
import com.mongodb.client.result.DeleteResult;
|
32
|
import com.mongodb.client.result.UpdateResult;
|
33
|
import eu.dnetlib.cql.CqlTranslator;
|
34
|
import eu.dnetlib.oai.PublisherField;
|
35
|
import eu.dnetlib.oai.PublisherStore;
|
36
|
import eu.dnetlib.oai.RecordChangeDetector;
|
37
|
import eu.dnetlib.oai.conf.OAIConfigurationReader;
|
38
|
import eu.dnetlib.oai.info.RecordInfo;
|
39
|
import eu.dnetlib.oai.info.SetInfo;
|
40
|
import eu.dnetlib.oai.parser.PublisherRecordParser;
|
41
|
import eu.dnetlib.oai.sets.MongoSetCollection;
|
42
|
import eu.dnetlib.rmi.provision.OaiPublisherRuntimeException;
|
43
|
import org.apache.commons.lang3.StringUtils;
|
44
|
import org.apache.commons.logging.Log;
|
45
|
import org.apache.commons.logging.LogFactory;
|
46
|
import org.bson.conversions.Bson;
|
47
|
import org.bson.types.Binary;
|
48
|
|
49
|
public class MongoPublisherStore implements PublisherStore<DNetOAIMongoCursor> {
|
50
|
|
51
|
private static final Log log = LogFactory.getLog(MongoPublisherStore.class); // NOPMD by marko on 11/24/08 5:02 PM
|
52
|
|
53
|
private String id, metadataFormat, interpretation, layout;
|
54
|
/**
|
55
|
* Keeps information about the fields to be created in mongo.
|
56
|
**/
|
57
|
private List<PublisherField> mongoFields;
|
58
|
|
59
|
private MongoCollection<DBObject> collection;
|
60
|
private MongoCollection<DBObject> discardedCollection;
|
61
|
|
62
|
private CqlTranslator cqlTranslator;
|
63
|
private RecordInfoGenerator recordInfoGenerator;
|
64
|
private MetadataExtractor metadataExtractor;
|
65
|
|
66
|
private RecordChangeDetector recordChangeDetector;
|
67
|
|
68
|
private MongoSetCollection mongoSetCollection;
|
69
|
|
70
|
|
71
|
/**
|
72
|
* Used to generate the OAI identifiers compliant to the protocol. See
|
73
|
* http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
|
74
|
*/
|
75
|
private String idScheme;
|
76
|
/**
|
77
|
* Used to generate the OAI identifiers compliant to the protocol. See
|
78
|
* http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
|
79
|
*/
|
80
|
private String idNamespace;
|
81
|
|
82
|
private boolean alwaysNewRecord;
|
83
|
|
84
|
public MongoPublisherStore() {
|
85
|
super();
|
86
|
}
|
87
|
|
88
|
public MongoPublisherStore(final String id,
|
89
|
final String metadataFormat,
|
90
|
final String interpretation,
|
91
|
final String layout,
|
92
|
final MongoCollection<DBObject> collection,
|
93
|
final List<PublisherField> mongoFields,
|
94
|
final CqlTranslator cqlTranslator,
|
95
|
final RecordInfoGenerator recordInfoGenerator,
|
96
|
final String idScheme,
|
97
|
final String idNamespace,
|
98
|
final MetadataExtractor metadataExtractor,
|
99
|
final RecordChangeDetector recordChangeDetector,
|
100
|
final boolean alwaysNewRecord,
|
101
|
final MongoDatabase mongodb) {
|
102
|
super();
|
103
|
this.id = id;
|
104
|
this.metadataFormat = metadataFormat;
|
105
|
this.interpretation = interpretation;
|
106
|
this.layout = layout;
|
107
|
this.collection = collection;
|
108
|
this.discardedCollection = mongodb.getCollection("discarded-" + collection.getNamespace().getCollectionName(), DBObject.class);
|
109
|
this.mongoFields = mongoFields;
|
110
|
this.cqlTranslator = cqlTranslator;
|
111
|
this.recordInfoGenerator = recordInfoGenerator;
|
112
|
this.idScheme = idScheme;
|
113
|
this.idNamespace = idNamespace;
|
114
|
this.metadataExtractor = metadataExtractor;
|
115
|
this.recordChangeDetector = recordChangeDetector;
|
116
|
this.alwaysNewRecord = alwaysNewRecord;
|
117
|
}
|
118
|
|
119
|
@Override
|
120
|
public RecordInfo getRecord(final String recordId) {
|
121
|
final Bson query = Filters.eq(OAIConfigurationReader.ID_FIELD, recordId);
|
122
|
final DBObject result = this.collection.find(query).first();
|
123
|
return this.recordInfoGenerator.transformDBObject(result, true);
|
124
|
}
|
125
|
|
126
|
@Override
|
127
|
public RecordInfo getRecord(final String recordId, final Function<String, String> unaryFunction) {
|
128
|
final RecordInfo result = this.getRecord(recordId);
|
129
|
if (result != null) {
|
130
|
final String transformedBody = unaryFunction.apply(result.getMetadata());
|
131
|
result.setMetadata(transformedBody);
|
132
|
}
|
133
|
return result;
|
134
|
}
|
135
|
|
136
|
@Override
|
137
|
public DNetOAIMongoCursor getRecords(final String queryString, final boolean bodyIncluded, final int limit) {
|
138
|
final FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
|
139
|
return new DNetOAIMongoCursor(iter.iterator(), bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
|
140
|
}
|
141
|
|
142
|
@Override
|
143
|
public DNetOAIMongoCursor getRecords(final String queryString,
|
144
|
final Function<String, String> unaryFunction,
|
145
|
final boolean bodyIncluded,
|
146
|
final int limit) {
|
147
|
final FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
|
148
|
return new DNetOAIMongoCursor(iter.iterator(), unaryFunction, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
|
149
|
}
|
150
|
|
151
|
private FindIterable<DBObject> loggedFindByQuery(final String queryString, final int limit) {
|
152
|
final Bson query = parseQuery(queryString);
|
153
|
long start = System.currentTimeMillis();
|
154
|
Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("_id"));
|
155
|
FindIterable<DBObject> iter = this.collection.find(query).sort(sortByIdAsc).limit(limit);
|
156
|
long end = System.currentTimeMillis();
|
157
|
log.debug("Query:" + query + "\ntime to get mongo iterable (ms): " + (end - start));
|
158
|
return iter;
|
159
|
}
|
160
|
|
161
|
private Bson parseQuery(final String query) {
|
162
|
try {
|
163
|
return cqlTranslator.toMongo(query);
|
164
|
} catch(Exception e ) {
|
165
|
throw new OaiPublisherRuntimeException(e);
|
166
|
}
|
167
|
}
|
168
|
|
169
|
|
170
|
@Override
|
171
|
public List<PublisherField> getIndices() {
|
172
|
return this.mongoFields;
|
173
|
}
|
174
|
|
175
|
/**
|
176
|
* <p>
|
177
|
* Ensure indices on the configuration-defined fields and on the system fields DATESTAMP_FIELD and LAST_COLLECTION_DATE_FIELD.
|
178
|
* <p>
|
179
|
* <p>
|
180
|
* Note that by default ID_FIELD, SET_FIELD, DELETED_FIELD, BODY_FIELD, UPDATED_FIELD are not indexed. If you want an index on those,
|
181
|
* then you have to specify it in the configuration file of the OAI Publisher: <br>
|
182
|
* <INDEX name="objIdentifier" repeatable="false">
|
183
|
* <SOURCE interpretation="transformed" layout="store" name="DMF" path="//*[local-name()='objIdentifier']"/>
|
184
|
* </INDEX>
|
185
|
* </p>
|
186
|
* <p>
|
187
|
* {@inheritDoc}
|
188
|
*/
|
189
|
@Override
|
190
|
public void ensureIndices() {
|
191
|
final IndexOptions indexOptions = new IndexOptions().background(true);
|
192
|
final Stopwatch sw = Stopwatch.createUnstarted();
|
193
|
sw.start();
|
194
|
for (PublisherField field : this.mongoFields) {
|
195
|
BasicDBObject mongoIdx = new BasicDBObject(field.getFieldName(), 1);
|
196
|
log.debug("Creating index on store "+id+" : " + mongoIdx);
|
197
|
this.collection.createIndex(mongoIdx, indexOptions);
|
198
|
}
|
199
|
log.debug("Creating index over : " + OAIConfigurationReader.DATESTAMP_FIELD);
|
200
|
this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.DATESTAMP_FIELD, 1), indexOptions);
|
201
|
log.debug("Creating index over : " + OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD);
|
202
|
this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, 1), indexOptions);
|
203
|
sw.stop();
|
204
|
log.info("All indexes have been updated in " + sw.elapsed(TimeUnit.MILLISECONDS) + " milliseconds");
|
205
|
}
|
206
|
|
207
|
/**
|
208
|
* Creates a compound index over the specified fields on the given store.
|
209
|
* <p>
|
210
|
* The creation is performed on the background
|
211
|
* </p>
|
212
|
*
|
213
|
* @param fieldNames
|
214
|
* List of fields to be included in the compound index
|
215
|
* @theStore MongoPublisherStore where to create the index
|
216
|
*/
|
217
|
public void createCompoundIndex(final List<String> fieldNames) {
|
218
|
if ((fieldNames == null) || fieldNames.isEmpty()) {
|
219
|
log.fatal("No fields specified for the creation of the compound index");
|
220
|
}
|
221
|
BasicDBObjectBuilder theIndexBuilder = BasicDBObjectBuilder.start();
|
222
|
for (String f : fieldNames) {
|
223
|
theIndexBuilder.add(f, 1);
|
224
|
}
|
225
|
BasicDBObject theIndex = (BasicDBObject) theIndexBuilder.get();
|
226
|
log.info("Creating index " + theIndex + " on " + this.getId());
|
227
|
this.getCollection().createIndex(theIndex, new IndexOptions().background(true));
|
228
|
}
|
229
|
|
230
|
private void dropDiscarded(final String source) {
|
231
|
if (StringUtils.isBlank(source)) {
|
232
|
log.debug("Dropping discarded records from publisherStore " + this.id);
|
233
|
this.discardedCollection.drop();
|
234
|
} else {
|
235
|
log.debug("Dropping discarded records for source " + source + " from publisherStore " + this.id);
|
236
|
this.discardedCollection.deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, source));
|
237
|
}
|
238
|
}
|
239
|
|
240
|
@Override
|
241
|
public int feed(final Iterable<String> records, final String source) {
|
242
|
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(80);
|
243
|
final Object sentinel = new Object();
|
244
|
this.dropDiscarded(source);
|
245
|
final Date feedDate = new Date();
|
246
|
final Thread background = new Thread(() -> {
|
247
|
// For fast feeding we want to use a collection with unack write concern
|
248
|
final MongoCollection<DBObject> unackCollection = MongoPublisherStore.this.collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED);
|
249
|
while (true) {
|
250
|
try {
|
251
|
final Object record = queue.take();
|
252
|
if (record == sentinel) {
|
253
|
break;
|
254
|
}
|
255
|
safeFeedRecord((String) record, source, feedDate, unackCollection);
|
256
|
} catch (final InterruptedException e) {
|
257
|
log.fatal("got exception in background thread", e);
|
258
|
throw new IllegalStateException(e);
|
259
|
}
|
260
|
}
|
261
|
});
|
262
|
background.start();
|
263
|
final long startFeed = feedDate.getTime();
|
264
|
try {
|
265
|
log.info("feeding publisherStore " + this.id);
|
266
|
for (final String record : records) {
|
267
|
queue.put(record);
|
268
|
}
|
269
|
queue.put(sentinel);
|
270
|
log.info("finished feeding publisherStore " + this.id);
|
271
|
|
272
|
background.join();
|
273
|
} catch (final InterruptedException e) {
|
274
|
throw new IllegalStateException(e);
|
275
|
}
|
276
|
final long endFeed = System.currentTimeMillis();
|
277
|
log.fatal("OAI STORE " + this.id + " FEEDING COMPLETED IN " + (endFeed - startFeed) + "ms");
|
278
|
this.setDeletedFlags(feedDate, source);
|
279
|
//Let's add the set here so we can avoid to add in the workflow another job that upsert the sets.
|
280
|
if(StringUtils.isNotBlank(source)) {
|
281
|
this.upsertSets(Lists.newArrayList(source));
|
282
|
}
|
283
|
return this.count();
|
284
|
}
|
285
|
|
286
|
/**
|
287
|
* Launches the thread that flags the records to be considered as 'deleted'.
|
288
|
* <p>
|
289
|
* The datestamp of the deleted records must be updated as well, according to the OAI specs available at
|
290
|
* http://www.openarchives.org/OAI/openarchivesprotocol.html#DeletedRecords: if a repository does keep track of deletions then the
|
291
|
* datestamp of the deleted record must be the date and time that it was deleted.
|
292
|
* </p>
|
293
|
*
|
294
|
* @param feedDate
|
295
|
* @param source
|
296
|
*/
|
297
|
private void setDeletedFlags(final Date feedDate, final String source) {
|
298
|
// get the collection with ACKNOWLEDGE Write concern
|
299
|
final MongoCollection<DBObject> ackCollection = this.collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
300
|
final Thread deletedSetter = new Thread(new Runnable() {
|
301
|
|
302
|
@Override
|
303
|
public void run() {
|
304
|
Bson filter = Filters.and(Filters.eq(OAIConfigurationReader.DELETED_FIELD, false),
|
305
|
Filters.lt(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate));
|
306
|
|
307
|
if (!StringUtils.isBlank(source)) {
|
308
|
filter = Filters.and(filter, Filters.eq(OAIConfigurationReader.SET_FIELD, mongoSetCollection.normalizeSetSpec(source)));
|
309
|
}
|
310
|
log.debug("Delete flag query: " + filter);
|
311
|
final BasicDBObject update = new BasicDBObject("$set",
|
312
|
BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true).append(OAIConfigurationReader.DATESTAMP_FIELD, feedDate)
|
313
|
.append(OAIConfigurationReader.UPDATED_FIELD, true).get());
|
314
|
log.debug("Updating as: " + update.toString());
|
315
|
final UpdateResult updateResult = ackCollection.updateMany(filter, update, new UpdateOptions().upsert(false));
|
316
|
log.debug("Deleted flags set for source: " + source + " #records = " + updateResult.getModifiedCount());
|
317
|
}
|
318
|
});
|
319
|
|
320
|
deletedSetter.start();
|
321
|
try {
|
322
|
deletedSetter.join();
|
323
|
} catch (final InterruptedException e) {
|
324
|
throw new IllegalStateException(e);
|
325
|
}
|
326
|
}
|
327
|
|
328
|
@Override
|
329
|
public void drop() {
|
330
|
this.collection.drop();
|
331
|
}
|
332
|
|
333
|
@Override
|
334
|
public void drop(final String queryString) {
|
335
|
Bson query = parseQuery(queryString);
|
336
|
final DeleteResult deleteResult = this.collection.deleteMany(query);
|
337
|
log.debug("Deleted by query: " + queryString + " #deleted: " + deleteResult.getDeletedCount());
|
338
|
}
|
339
|
|
340
|
@Override
|
341
|
public int count() {
|
342
|
return (int) this.collection.count();
|
343
|
}
|
344
|
|
345
|
@Override
|
346
|
public int count(final String queryString) {
|
347
|
if (StringUtils.isBlank(queryString)) return (int) this.collection.count();
|
348
|
Bson query = parseQuery(queryString);
|
349
|
return (int) this.collection.count(query);
|
350
|
}
|
351
|
|
352
|
public List<String> getDistinctSetNamesFromRecords() {
|
353
|
log.info("Going to ask for all distinct sets in the oaistore " + this.id + ": this may take a long time...");
|
354
|
return Lists.newArrayList(this.collection.distinct(OAIConfigurationReader.SET_FIELD, String.class));
|
355
|
}
|
356
|
|
357
|
// ***********************************************************************************************//
|
358
|
// Feed utilities
|
359
|
// ***********************************************************************************************//
|
360
|
private boolean safeFeedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
|
361
|
try {
|
362
|
if (!record.isEmpty()) { return feedRecord(record, source, feedDate, unackCollection); }
|
363
|
} catch (final Throwable e) {
|
364
|
log.error("Got unhandled exception while parsing record", e);
|
365
|
this.discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record));
|
366
|
}
|
367
|
return false;
|
368
|
}
|
369
|
|
370
|
/**
|
371
|
* Feed the record to the store.
|
372
|
*
|
373
|
* @return true if the record is new, false otherwise
|
374
|
*/
|
375
|
private boolean feedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
|
376
|
final PublisherRecordParser parser = new PublisherRecordParser(this.mongoFields);
|
377
|
log.debug("configured parser for fields: "+this.mongoFields);
|
378
|
final Multimap<String, String> recordProperties = parser.parseRecord(record, source);
|
379
|
String id;
|
380
|
String oaiID;
|
381
|
if (recordProperties.containsKey(OAIConfigurationReader.ID_FIELD)) {
|
382
|
id = recordProperties.get(OAIConfigurationReader.ID_FIELD).iterator().next();
|
383
|
oaiID = getOAIIdentifier(id);
|
384
|
if (isNewRecord(oaiID)) {
|
385
|
feedNew(oaiID, record, recordProperties, feedDate, unackCollection);
|
386
|
return true;
|
387
|
} else {
|
388
|
if (isChanged(oaiID, record)) {
|
389
|
updateRecord(oaiID, record, recordProperties, feedDate, unackCollection);
|
390
|
} else {
|
391
|
// it is not changed, I only have to update the last collection date
|
392
|
handleRecord(oaiID, feedDate, unackCollection);
|
393
|
}
|
394
|
}
|
395
|
} else {
|
396
|
log.error("parsed record seems invalid -- no identifier property with name: " + OAIConfigurationReader.ID_FIELD);
|
397
|
log.error("Extracted property map: \n"+recordProperties);
|
398
|
log.debug("from: \n"+record);
|
399
|
this.discardedCollection
|
400
|
.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append(
|
401
|
OAIConfigurationReader.DATESTAMP_FIELD, feedDate));
|
402
|
}
|
403
|
return false;
|
404
|
}
|
405
|
|
406
|
private BasicDBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties) {
|
407
|
final BasicDBObject obj = new BasicDBObject();
|
408
|
for (final String key : recordProperties.keySet()) {
|
409
|
if (key.equals(OAIConfigurationReader.ID_FIELD)) {
|
410
|
obj.put(key, oaiID);
|
411
|
} else {
|
412
|
final Collection<String> values = recordProperties.get(key);
|
413
|
if (key.equals(OAIConfigurationReader.SET_FIELD)) {
|
414
|
|
415
|
final Iterable<String> setSpecs =
|
416
|
values.stream().map(s -> MongoPublisherStore.this.mongoSetCollection.normalizeSetSpec(s)).collect(Collectors.toList());
|
417
|
obj.put(key, setSpecs);
|
418
|
} else {
|
419
|
// let's check if the key is the name of a repeatable field or not
|
420
|
final PublisherField keyField = Iterables.find(this.mongoFields, field -> field.getFieldName().equals(key), null);
|
421
|
if (keyField == null) {
|
422
|
log.warn("Expected field to index: " + key + " could not be found, but we keep going...");
|
423
|
}
|
424
|
if ((keyField != null) && !keyField.isRepeatable()) {
|
425
|
if ((values != null) && !values.isEmpty()) {
|
426
|
obj.put(key, values.iterator().next());
|
427
|
}
|
428
|
} else {
|
429
|
obj.put(key, values);
|
430
|
}
|
431
|
}
|
432
|
}
|
433
|
}
|
434
|
try {
|
435
|
obj.put(OAIConfigurationReader.BODY_FIELD, createCompressRecord(record));
|
436
|
obj.put(OAIConfigurationReader.DELETED_FIELD, false);
|
437
|
return obj;
|
438
|
} catch (final IOException e) {
|
439
|
throw new OaiPublisherRuntimeException(e);
|
440
|
}
|
441
|
}
|
442
|
|
443
|
/**
|
444
|
* @param record
|
445
|
* @throws IOException
|
446
|
*/
|
447
|
public Binary createCompressRecord(final String record) throws IOException {
|
448
|
final ByteArrayOutputStream os = new ByteArrayOutputStream();
|
449
|
final ZipOutputStream zos = new ZipOutputStream(os);
|
450
|
final ZipEntry entry = new ZipEntry(OAIConfigurationReader.BODY_FIELD);
|
451
|
zos.putNextEntry(entry);
|
452
|
zos.write(record.getBytes());
|
453
|
zos.closeEntry();
|
454
|
zos.flush();
|
455
|
zos.close();
|
456
|
return new Binary(os.toByteArray());
|
457
|
}
|
458
|
|
459
|
private void feedNew(final String oaiID,
|
460
|
final String record,
|
461
|
final Multimap<String, String> recordProperties,
|
462
|
final Date feedDate,
|
463
|
final MongoCollection<DBObject> unackCollection) {
|
464
|
log.debug("New record received. Assigned oai id: " + oaiID);
|
465
|
final DBObject obj = this.createBasicObject(oaiID, record, recordProperties);
|
466
|
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
|
467
|
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
|
468
|
obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
|
469
|
unackCollection.insertOne(obj);
|
470
|
this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
|
471
|
}
|
472
|
|
473
|
private void updateRecord(final String oaiID,
|
474
|
final String record,
|
475
|
final Multimap<String, String> recordProperties,
|
476
|
final Date feedDate,
|
477
|
final MongoCollection<DBObject> unackCollection) {
|
478
|
log.debug("updating record " + oaiID);
|
479
|
final BasicDBObject obj = this.createBasicObject(oaiID, record, recordProperties);
|
480
|
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
|
481
|
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
|
482
|
obj.put(OAIConfigurationReader.UPDATED_FIELD, true);
|
483
|
final Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
|
484
|
unackCollection.replaceOne(oldObj, obj, new UpdateOptions().upsert(true));
|
485
|
// unackCollection.updateOne(oldObj, new Document("$set",obj), new UpdateOptions().upsert(true));
|
486
|
this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
|
487
|
}
|
488
|
|
489
|
public void upsertSets(final Iterable<String> setNames) {
|
490
|
// feed the list of sets, if any
|
491
|
if (setNames != null) {
|
492
|
for (final String setName : setNames) {
|
493
|
if (StringUtils.isNotBlank(setName)) {
|
494
|
final SetInfo set = new SetInfo();
|
495
|
final String setSpec = this.mongoSetCollection.normalizeSetSpec(setName);
|
496
|
set.setSetSpec(setSpec);
|
497
|
set.setSetName(setName);
|
498
|
set.setSetDescription("This set contains metadata records whose provenance is " + setName);
|
499
|
set.setEnabled(true);
|
500
|
final String query = "(" + OAIConfigurationReader.SET_FIELD + " = \"" + setSpec + "\") ";
|
501
|
set.setQuery(query);
|
502
|
this.mongoSetCollection.upsertSet(set, false, getCollection().getNamespace().getDatabaseName());
|
503
|
}
|
504
|
}
|
505
|
}
|
506
|
}
|
507
|
|
508
|
private void handleRecord(final String oaiID, final Date lastCollectionDate, final MongoCollection<DBObject> unackCollection) {
|
509
|
log.debug("handling unchanged record " + oaiID);
|
510
|
final Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
|
511
|
final BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, lastCollectionDate));
|
512
|
unackCollection.updateOne(oldObj, update, new UpdateOptions().upsert(true));
|
513
|
}
|
514
|
|
515
|
private boolean isNewRecord(final String oaiIdentifier) {
|
516
|
if (this.alwaysNewRecord || (this.collection.count() == 0)) { return true; }
|
517
|
return this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, oaiIdentifier)).first() == null;
|
518
|
}
|
519
|
|
520
|
// ***********************************************************************************************//
|
521
|
// Setters / Getters / Basic utilities
|
522
|
// ***********************************************************************************************//
|
523
|
|
524
|
private boolean isChanged(final String oaiID, final String record) {
|
525
|
final RecordInfo oldRecord = getRecord(oaiID);
|
526
|
if (oldRecord == null) { return StringUtils.isBlank(record); }
|
527
|
return this.recordChangeDetector.differs(oldRecord.getMetadata(), record);
|
528
|
}
|
529
|
|
530
|
private String getOAIIdentifier(final String id) {
|
531
|
return this.idScheme + ":" + this.idNamespace + ":" + id;
|
532
|
}
|
533
|
|
534
|
@Override
|
535
|
public int hashCode() {
|
536
|
final int prime = 31;
|
537
|
int result = 1;
|
538
|
result = (prime * result) + ((this.collection == null) ? 0 : this.collection.hashCode());
|
539
|
result = (prime * result) + ((this.id == null) ? 0 : this.id.hashCode());
|
540
|
result = (prime * result) + ((this.interpretation == null) ? 0 : this.interpretation.hashCode());
|
541
|
result = (prime * result) + ((this.layout == null) ? 0 : this.layout.hashCode());
|
542
|
result = (prime * result) + ((this.metadataFormat == null) ? 0 : this.metadataFormat.hashCode());
|
543
|
return result;
|
544
|
}
|
545
|
|
546
|
@Override
|
547
|
public boolean equals(final Object obj) {
|
548
|
if (this == obj) { return true; }
|
549
|
if (obj == null) { return false; }
|
550
|
if (!(obj instanceof MongoPublisherStore)) { return false; }
|
551
|
final MongoPublisherStore other = (MongoPublisherStore) obj;
|
552
|
if (this.collection == null) {
|
553
|
if (other.collection != null) { return false; }
|
554
|
} else if (!this.collection.equals(other.collection)) { return false; }
|
555
|
if (this.id == null) {
|
556
|
if (other.id != null) { return false; }
|
557
|
} else if (!this.id.equals(other.id)) { return false; }
|
558
|
if (this.interpretation == null) {
|
559
|
if (other.interpretation != null) { return false; }
|
560
|
} else if (!this.interpretation.equals(other.interpretation)) { return false; }
|
561
|
if (this.layout == null) {
|
562
|
if (other.layout != null) { return false; }
|
563
|
} else if (!this.layout.equals(other.layout)) { return false; }
|
564
|
if (this.metadataFormat == null) {
|
565
|
if (other.metadataFormat != null) { return false; }
|
566
|
} else if (!this.metadataFormat.equals(other.metadataFormat)) { return false; }
|
567
|
return true;
|
568
|
}
|
569
|
|
570
|
public MongoCollection<DBObject> getCollection() {
|
571
|
return this.collection;
|
572
|
}
|
573
|
|
574
|
public void setCollection(final MongoCollection<DBObject> collection) {
|
575
|
this.collection = collection;
|
576
|
}
|
577
|
|
578
|
public MongoCollection<DBObject> getDiscardedCollection() {
|
579
|
return this.discardedCollection;
|
580
|
}
|
581
|
|
582
|
public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
|
583
|
this.discardedCollection = discardedCollection;
|
584
|
}
|
585
|
|
586
|
public String getIdScheme() {
|
587
|
return this.idScheme;
|
588
|
}
|
589
|
|
590
|
public void setIdScheme(final String idScheme) {
|
591
|
this.idScheme = idScheme;
|
592
|
}
|
593
|
|
594
|
public String getIdNamespace() {
|
595
|
return this.idNamespace;
|
596
|
}
|
597
|
|
598
|
public void setIdNamespace(final String idNamespace) {
|
599
|
this.idNamespace = idNamespace;
|
600
|
}
|
601
|
|
602
|
public RecordInfoGenerator getRecordInfoGenerator() {
|
603
|
return this.recordInfoGenerator;
|
604
|
}
|
605
|
|
606
|
public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) {
|
607
|
this.recordInfoGenerator = recordInfoGenerator;
|
608
|
}
|
609
|
|
610
|
public MetadataExtractor getMetadataExtractor() {
|
611
|
return this.metadataExtractor;
|
612
|
}
|
613
|
|
614
|
public void setMetadataExtractor(final MetadataExtractor metadataExtractor) {
|
615
|
this.metadataExtractor = metadataExtractor;
|
616
|
}
|
617
|
|
618
|
public RecordChangeDetector getRecordChangeDetector() {
|
619
|
return this.recordChangeDetector;
|
620
|
}
|
621
|
|
622
|
public void setRecordChangeDetector(final RecordChangeDetector recordChangeDetector) {
|
623
|
this.recordChangeDetector = recordChangeDetector;
|
624
|
}
|
625
|
|
626
|
@Override
|
627
|
public String getId() {
|
628
|
return this.id;
|
629
|
}
|
630
|
|
631
|
public void setId(final String id) {
|
632
|
this.id = id;
|
633
|
}
|
634
|
|
635
|
@Override
|
636
|
public String getMetadataFormat() {
|
637
|
return this.metadataFormat;
|
638
|
}
|
639
|
|
640
|
public void setMetadataFormat(final String metadataFormat) {
|
641
|
this.metadataFormat = metadataFormat;
|
642
|
}
|
643
|
|
644
|
@Override
|
645
|
public String getInterpretation() {
|
646
|
return this.interpretation;
|
647
|
}
|
648
|
|
649
|
public void setInterpretation(final String interpretation) {
|
650
|
this.interpretation = interpretation;
|
651
|
}
|
652
|
|
653
|
@Override
|
654
|
public String getLayout() {
|
655
|
return this.layout;
|
656
|
}
|
657
|
|
658
|
public void setLayout(final String layout) {
|
659
|
this.layout = layout;
|
660
|
}
|
661
|
|
662
|
public MongoSetCollection getMongoSetCollection() {
|
663
|
return this.mongoSetCollection;
|
664
|
}
|
665
|
|
666
|
public void setMongoSetCollection(final MongoSetCollection mongoSetCollection) {
|
667
|
this.mongoSetCollection = mongoSetCollection;
|
668
|
}
|
669
|
|
670
|
public List<PublisherField> getMongoFields() {
|
671
|
return this.mongoFields;
|
672
|
}
|
673
|
|
674
|
public void setMongoFields(final List<PublisherField> mongoFields) {
|
675
|
this.mongoFields = mongoFields;
|
676
|
}
|
677
|
|
678
|
public boolean isAlwaysNewRecord() {
|
679
|
return this.alwaysNewRecord;
|
680
|
}
|
681
|
|
682
|
public void setAlwaysNewRecord(final boolean alwaysNewRecord) {
|
683
|
this.alwaysNewRecord = alwaysNewRecord;
|
684
|
}
|
685
|
|
686
|
|
687
|
|
688
|
}
|