Project

General

Profile

1
package eu.dnetlib.data.oai.store.mongo;
2

    
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.Date;
6
import java.util.List;
7
import java.util.concurrent.ArrayBlockingQueue;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.TimeUnit;
10
import java.util.zip.ZipEntry;
11
import java.util.zip.ZipOutputStream;
12

    
13
import com.google.common.base.Function;
14
import com.google.common.base.Predicate;
15
import com.google.common.base.Stopwatch;
16
import com.google.common.collect.Iterables;
17
import com.google.common.collect.Lists;
18
import com.google.common.collect.Multimap;
19
import com.mongodb.BasicDBObject;
20
import com.mongodb.BasicDBObjectBuilder;
21
import com.mongodb.DBObject;
22
import com.mongodb.WriteConcern;
23
import com.mongodb.client.FindIterable;
24
import com.mongodb.client.ListIndexesIterable;
25
import com.mongodb.client.MongoCollection;
26
import com.mongodb.client.MongoDatabase;
27
import com.mongodb.client.model.Filters;
28
import com.mongodb.client.model.IndexOptions;
29
import com.mongodb.client.model.Sorts;
30
import com.mongodb.client.model.UpdateOptions;
31
import com.mongodb.client.result.DeleteResult;
32
import com.mongodb.client.result.UpdateResult;
33
import eu.dnetlib.data.information.oai.publisher.OaiPublisherRuntimeException;
34
import eu.dnetlib.data.information.oai.publisher.PublisherField;
35
import eu.dnetlib.data.information.oai.publisher.conf.OAIConfigurationReader;
36
import eu.dnetlib.data.information.oai.publisher.info.RecordInfo;
37
import eu.dnetlib.data.information.oai.publisher.info.SetInfo;
38
import eu.dnetlib.data.oai.store.PublisherStore;
39
import eu.dnetlib.data.oai.store.RecordChangeDetector;
40
import eu.dnetlib.data.oai.store.parser.PublisherRecordParser;
41
import eu.dnetlib.data.oai.store.sets.MongoSetCollection;
42
import eu.dnetlib.functionality.cql.mongo.MongoCqlTranslator;
43
import eu.dnetlib.miscutils.functional.UnaryFunction;
44
import org.apache.commons.io.output.ByteArrayOutputStream;
45
import org.apache.commons.lang3.StringUtils;
46
import org.apache.commons.logging.Log;
47
import org.apache.commons.logging.LogFactory;
48
import org.bson.conversions.Bson;
49
import org.bson.types.Binary;
50

    
51
public class MongoPublisherStore implements PublisherStore<DNetOAIMongoCursor> {
52

    
53
	private static final Log log = LogFactory.getLog(MongoPublisherStore.class); // NOPMD by marko on 11/24/08 5:02 PM
54

    
55
	private String id, metadataFormat, interpretation, layout;
56
	/** Keeps information about the fields to be created in mongo. **/
57
	private List<PublisherField> mongoFields;
58

    
59
	private MongoCollection<DBObject> collection;
60
	private MongoCollection<DBObject> discardedCollection;
61

    
62
	private RecordInfoGenerator recordInfoGenerator;
63
	private MetadataExtractor metadataExtractor;
64

    
65
	private RecordChangeDetector recordChangeDetector;
66

    
67
	private MongoSetCollection mongoSetCollection;
68

    
69
	/**
70
	 * Used to generate the OAI identifiers compliant to the protocol. See
71
	 * http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
72
	 */
73
	private String idScheme;
74
	/**
75
	 * Used to generate the OAI identifiers compliant to the protocol. See
76
	 * http://www.openarchives.org/OAI/openarchivesprotocol.html#UniqueIdentifier.
77
	 */
78
	private String idNamespace;
79

    
80
	private boolean alwaysNewRecord;
81

    
82
	public MongoPublisherStore() {
83
		super();
84
	}
85

    
86
	public MongoPublisherStore(final String id,
87
			final String metadataFormat,
88
			final String interpretation,
89
			final String layout,
90
			final MongoCollection<DBObject> collection,
91
			final List<PublisherField> mongoFields,
92
			final RecordInfoGenerator recordInfoGenerator,
93
			final String idScheme,
94
			final String idNamespace,
95
			final MetadataExtractor metadataExtractor,
96
			final RecordChangeDetector recordChangeDetector,
97
			final boolean alwaysNewRecord,
98
			final MongoDatabase mongodb) {
99
		super();
100
		this.id = id;
101
		this.metadataFormat = metadataFormat;
102
		this.interpretation = interpretation;
103
		this.layout = layout;
104
		this.collection = collection;
105
		this.discardedCollection = mongodb.getCollection("discarded-" + collection.getNamespace().getCollectionName(), DBObject.class);
106
		this.mongoFields = mongoFields;
107
		this.recordInfoGenerator = recordInfoGenerator;
108
		this.idScheme = idScheme;
109
		this.idNamespace = idNamespace;
110
		this.recordChangeDetector = recordChangeDetector;
111
		this.alwaysNewRecord = alwaysNewRecord;
112
	}
113

    
114
	@Override
115
	public RecordInfo getRecord(final String recordId) {
116
		Bson query = Filters.eq(OAIConfigurationReader.ID_FIELD, recordId);
117
		DBObject result = this.collection.find(query).first();
118
		log.debug(result);
119
		return this.recordInfoGenerator.transformDBObject(result, true);
120
	}
121

    
122
	@Override
123
	public RecordInfo getRecord(final String recordId, final UnaryFunction<String, String> unaryFunction) {
124
		RecordInfo result = this.getRecord(recordId);
125
		if (result != null) {
126
			String transformedBody = unaryFunction.evaluate(result.getMetadata());
127
			result.setMetadata(transformedBody);
128
		}
129
		return result;
130
	}
131

    
132
	@Override
133
	public DNetOAIMongoCursor getRecords(final String queryString, final boolean bodyIncluded, final int limit) {
134
		FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
135
		return new DNetOAIMongoCursor(iter.iterator(), bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
136
	}
137

    
138
	@Override
139
	public DNetOAIMongoCursor getRecords(final String queryString,
140
			final UnaryFunction<String, String> unaryFunction,
141
			final boolean bodyIncluded,
142
			final int limit) {
143
		FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
144
		return new DNetOAIMongoCursor(iter.iterator(), unaryFunction, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
145
	}
146

    
147
	private FindIterable<DBObject> loggedFindByQuery(final String queryString, final int limit) {
148
		final Bson query = parseQuery(queryString);
149
		long start = System.currentTimeMillis();
150
		Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("_id"));
151
		FindIterable<DBObject> iter = this.collection.find(query).sort(sortByIdAsc).limit(limit);
152
		long end = System.currentTimeMillis();
153
		log.debug("Query:" + query + "\ntime to get mongo iterable (ms): " + (end - start));
154
		return iter;
155
	}
156

    
157
	private Bson parseQuery(final String query) {
158
		try {
159
			return MongoCqlTranslator.toMongo(query);
160
		} catch(Exception e ) {
161
			throw new OaiPublisherRuntimeException(e);
162
		}
163
	}
164

    
165
	@Override
166
	public List<PublisherField> getIndices() {
167
		return this.mongoFields;
168
	}
169

    
170
	/**
171
	 * <p>
172
	 * Ensure indices on the configuration-defined fields and on the system fields DATESTAMP_FIELD and LAST_COLLECTION_DATE_FIELD.
173
	 * <p>
174
	 * <p>
175
	 * Note that by default ID_FIELD, SET_FIELD, DELETED_FIELD, BODY_FIELD, UPDATED_FIELD are not indexed. If you want an index on those,
176
	 * then you have to specify it in the configuration file of the OAI Publisher: <br>
177
	 * <INDEX name="deleted">
178
	 * </p>
179
	 *
180
	 * {@inheritDoc}
181
	 *
182
	 */
183
	@Override
184
	public void ensureIndices() {
185
		final ListIndexesIterable<BasicDBObject> indexesIterable = this.collection.listIndexes(BasicDBObject.class);
186
		final IndexOptions indexOptions = new IndexOptions().background(true);
187
		Stopwatch sw = Stopwatch.createUnstarted();
188
		sw.start();
189
		// I want to keep the composite indexes that might have been defined manually
190
		// I DO NOT NEED TO DO THIS. If the indexes are there, they are there.
191
		/*log.debug("Ensuring currently defined composite indexes on store "+id+": ");
192
		for (BasicDBObject o : indexesIterable) {
193
			BasicDBObject fieldIndexed = (BasicDBObject) o.get("key");
194
			if (fieldIndexed.keySet().size() > 1) {
195
				log.debug(o);
196
				this.collection.createIndex(fieldIndexed, indexOptions);
197
			}
198
		}*/
199
		// Indexes on single fields.
200
		for (PublisherField field : this.mongoFields) {
201
			BasicDBObject mongoIdx = new BasicDBObject(field.getFieldName(), 1);
202
			log.debug("Creating index on store "+id+" : " + mongoIdx);
203
			this.collection.createIndex(mongoIdx, indexOptions);
204
		}
205
		log.debug("Creating index over : " + OAIConfigurationReader.DATESTAMP_FIELD);
206
		this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.DATESTAMP_FIELD, 1), indexOptions);
207
		log.debug("Creating index over : " + OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD);
208
		this.collection.createIndex(new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, 1), indexOptions);
209
		sw.stop();
210
		log.info("All indexes have been updated in " + sw.elapsed(TimeUnit.MILLISECONDS) + " milliseconds");
211
	}
212

    
213
	/**
214
	 * Creates a compound index over the specified fields on the given store.
215
	 * <p>
216
	 * The creation is performed on the background
217
	 * </p>
218
	 *
219
	 * @param fieldNames
220
	 *            List of fields to be included in the compound index
221
	 * @theStore MongoPublisherStore where to create the index
222
	 */
223
	public void createCompoundIndex(final List<String> fieldNames) {
224
		if ((fieldNames == null) || fieldNames.isEmpty()) {
225
			log.fatal("No fields specified for the creation of the compound index");
226
		}
227
		BasicDBObjectBuilder theIndexBuilder = BasicDBObjectBuilder.start();
228
		for (String f : fieldNames) {
229
			theIndexBuilder.add(f, 1);
230
		}
231
		BasicDBObject theIndex = (BasicDBObject) theIndexBuilder.get();
232
		log.info("Creating index " + theIndex + " on " + this.getId());
233
		this.getCollection().createIndex(theIndex, new IndexOptions().background(true));
234
	}
235

    
236
	private void dropDiscarded(final String source) {
237
		if (StringUtils.isBlank(source)) {
238
			log.debug("Dropping discarded records from publisherStore " + id);
239
			discardedCollection.drop();
240
		} else {
241
			log.debug("Dropping discarded records for source " + source + " from publisherStore " + id);
242
			discardedCollection.deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, source));
243
		}
244
	}
245

    
246
	@Override
247
	public int feed(final Iterable<String> records, final String source) {
248
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(80);
249
		final Object sentinel = new Object();
250
		this.dropDiscarded(source);
251
		final Date feedDate = new Date();
252
		Thread background = new Thread(new Runnable() {
253

    
254
			@Override
255
			public void run() {
256
				//For fast feeding we want to use a collection with unack write concern
257
				final MongoCollection<DBObject> unackCollection = collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED);
258
				while (true) {
259
					try {
260
						Object record = queue.take();
261
						if (record == sentinel) {
262
							break;
263
						}
264
						safeFeedRecord((String) record, source, feedDate, unackCollection);
265
					} catch (InterruptedException e) {
266
						log.fatal("got exception in background thread", e);
267
						throw new IllegalStateException(e);
268
					}
269
				}
270
			}
271
		});
272
		background.start();
273
		long startFeed = feedDate.getTime();
274
		try {
275
			log.info("feeding publisherStore " + id);
276
			for (final String record : records) {
277
				queue.put(record);
278
			}
279
			queue.put(sentinel);
280
			log.info("finished feeding publisherStore " + id);
281

    
282
			background.join();
283
		} catch (InterruptedException e) {
284
			throw new IllegalStateException(e);
285
		}
286
		long endFeed = System.currentTimeMillis();
287
		log.fatal("OAI STORE " + id + " FEEDING COMPLETED IN " + (endFeed - startFeed) + "ms");
288
		this.setDeletedFlags(feedDate, source);
289
		return this.count();
290
	}
291

    
292
	/**
293
	 * Launches the thread that flags the records to be considered as 'deleted'.
294
	 * <p>
295
	 * The datestamp of the deleted records must be updated as well, according to the OAI specs available at
296
	 * http://www.openarchives.org/OAI/openarchivesprotocol.html#DeletedRecords: if a repository does keep track of deletions then the
297
	 * datestamp of the deleted record must be the date and time that it was deleted.
298
	 * </p>
299
	 *
300
	 * @param feedDate
301
	 * @param source
302
	 */
303
	private void setDeletedFlags(final Date feedDate, final String source) {
304
		//get the collection with ACKNOWLEDGE Write concern
305
		final MongoCollection<DBObject> ackCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
306
		Thread deletedSetter = new Thread(new Runnable() {
307

    
308
			@Override
309
			public void run() {
310
				Bson filter = Filters.and(Filters.eq(OAIConfigurationReader.DELETED_FIELD, false),
311
						Filters.lt(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate));
312
				if (!StringUtils.isBlank(source)) {
313
					filter = Filters.and(filter, Filters.eq(OAIConfigurationReader.SET_FIELD, source));
314
				}
315
				log.debug("Delete flag query: " + filter);
316
				BasicDBObject update = new BasicDBObject("$set",
317
						BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true).append(OAIConfigurationReader.DATESTAMP_FIELD, feedDate)
318
								.append(OAIConfigurationReader.UPDATED_FIELD, true).get());
319
				log.debug("Updating as: " + update.toString());
320
				final UpdateResult updateResult = ackCollection.updateMany(filter, update, new UpdateOptions().upsert(false));
321
				log.debug("Deleted flags set for source: " + source + " #records = " + updateResult.getModifiedCount());
322
			}
323
		});
324

    
325
		deletedSetter.start();
326
		try {
327
			deletedSetter.join();
328
		} catch (InterruptedException e) {
329
			throw new IllegalStateException(e);
330
		}
331
	}
332

    
333
	@Override
334
	public void drop() {
335
		this.collection.drop();
336
	}
337

    
338
	@Override
339
	public void drop(final String queryString) {
340
		Bson query = parseQuery(queryString);
341
		final DeleteResult deleteResult = this.collection.deleteMany(query);
342
		log.debug("Deleted by query: " + queryString + " #deleted: " + deleteResult.getDeletedCount());
343

    
344
	}
345

    
346
	@Override
347
	public int count() {
348
		return (int) this.collection.count();
349
	}
350

    
351
	@Override
352
	public int count(final String queryString) {
353
		if (StringUtils.isBlank(queryString)) return (int) this.collection.count();
354
		Bson query = parseQuery(queryString);
355
		return (int) this.collection.count(query);
356
	}
357

    
358
	public List<String> getDistinctSetNamesFromRecords() {
359
		log.info("Going to ask for all distinct sets in the oaistore " + id + ": this may take a long time...");
360
		return Lists.newArrayList(this.collection.distinct(OAIConfigurationReader.SET_FIELD, String.class));
361
	}
362

    
363
	// ***********************************************************************************************//
364
	// Feed utilities
365
	// ***********************************************************************************************//
366
	private boolean safeFeedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
367
		try {
368
			if (!record.isEmpty()) return feedRecord(record, source, feedDate, unackCollection);
369
		} catch (final Throwable e) {
370
			log.error("Got unhandled exception while parsing record", e);
371
			discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record));
372
		}
373
		return false;
374
	}
375

    
376
	/**
377
	 * Feed the record to the store.
378
	 *
379
	 * @return true if the record is new, false otherwise
380
	 */
381
	private boolean feedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
382
		PublisherRecordParser parser = new PublisherRecordParser(this.mongoFields);
383
		final Multimap<String, String> recordProperties = parser.parseRecord(record, source);
384
		String id = "";
385
		String oaiID = "";
386
		if (recordProperties.containsKey(OAIConfigurationReader.ID_FIELD)) {
387
			id = recordProperties.get(OAIConfigurationReader.ID_FIELD).iterator().next();
388
			oaiID = getOAIIdentifier(id);
389
			if (isNewRecord(oaiID)) {
390
				feedNew(oaiID, record, recordProperties, feedDate, unackCollection);
391
				return true;
392
			} else {
393
				if (isChanged(oaiID, record)) {
394
					updateRecord(oaiID, record, recordProperties, feedDate, unackCollection);
395
				} else {
396
					// it is not changed, I only have to update the last collection date
397
					handleRecord(oaiID, feedDate, unackCollection);
398
				}
399
			}
400
		} else {
401
			log.error("parsed record seems invalid -- no identifier property with name: " + OAIConfigurationReader.ID_FIELD);
402
			discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append(
403
					OAIConfigurationReader.DATESTAMP_FIELD, feedDate));
404
		}
405
		return false;
406
	}
407

    
408
	private BasicDBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties) {
409
		BasicDBObject obj = new BasicDBObject();
410
		for (final String key : recordProperties.keySet()) {
411
			if (key.equals(OAIConfigurationReader.ID_FIELD)) {
412
				obj.put(key, oaiID);
413
			} else {
414
				Collection<String> values = recordProperties.get(key);
415
				if (key.equals(OAIConfigurationReader.SET_FIELD)) {
416

    
417
					Iterable<String> setSpecs = Iterables.transform(values, new Function<String, String>() {
418

    
419
						@Override
420
						public String apply(final String s) {
421
							return mongoSetCollection.normalizeSetSpec(s);
422
						}
423

    
424
					});
425
					obj.put(key, setSpecs);
426
				} else {
427
					// let's check if the key is the name of a repeatable field or not
428
					PublisherField keyField = Iterables.find(this.mongoFields, new Predicate<PublisherField>() {
429

    
430
						@Override
431
						public boolean apply(final PublisherField field) {
432
							return field.getFieldName().equals(key);
433
						}
434
					}, null);
435
					if (keyField == null) {
436
						log.warn("Expected field to index: " + key + " could not be found, but we keep going...");
437
					}
438
					if ((keyField != null) && !keyField.isRepeatable()) {
439
						if ((values != null) && !values.isEmpty()) {
440
							obj.put(key, values.iterator().next());
441
						}
442
					} else {
443
						obj.put(key, values);
444
					}
445
				}
446
			}
447
		}
448
		try {
449
			obj.put(OAIConfigurationReader.BODY_FIELD, createCompressRecord(record));
450
			obj.put(OAIConfigurationReader.DELETED_FIELD, false);
451
			return obj;
452
		} catch (IOException e) {
453
			throw new OaiPublisherRuntimeException(e);
454
		}
455
	}
456

    
457
	/**
458
	 * @param record
459
	 * @throws IOException
460
	 */
461
	public Binary createCompressRecord(final String record) throws IOException {
462
		ByteArrayOutputStream os = new ByteArrayOutputStream();
463
		ZipOutputStream zos = new ZipOutputStream(os);
464
		ZipEntry entry = new ZipEntry(OAIConfigurationReader.BODY_FIELD);
465
		zos.putNextEntry(entry);
466
		zos.write(record.getBytes());
467
		zos.closeEntry();
468
		zos.flush();
469
		zos.close();
470
		return new Binary(os.toByteArray());
471
	}
472

    
473
	private void feedNew(final String oaiID,
474
			final String record,
475
			final Multimap<String, String> recordProperties,
476
			final Date feedDate,
477
			final MongoCollection<DBObject> unackCollection) {
478
		log.debug("New record received. Assigned oai id: " + oaiID);
479
		DBObject obj = this.createBasicObject(oaiID, record, recordProperties);
480
		obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
481
		obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
482
		obj.put(OAIConfigurationReader.UPDATED_FIELD, false);
483
		unackCollection.insertOne(obj);
484
		this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
485
	}
486

    
487
	private void updateRecord(final String oaiID,
488
			final String record,
489
			final Multimap<String, String> recordProperties,
490
			final Date feedDate,
491
			final MongoCollection<DBObject> unackCollection) {
492
		log.debug("updating record " + oaiID);
493
		BasicDBObject obj = this.createBasicObject(oaiID, record, recordProperties);
494
		obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate);
495
		obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate);
496
		obj.put(OAIConfigurationReader.UPDATED_FIELD, true);
497
		Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
498
		unackCollection.replaceOne(oldObj, obj, new UpdateOptions().upsert(true));
499
		this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD));
500
	}
501

    
502
	public void upsertSets(final Iterable<String> setNames) {
503
		// feed the list of sets, if any
504
		if (setNames != null) {
505
			for (String setName : setNames) {
506
				if (StringUtils.isNotBlank(setName)) {
507
					final SetInfo set = new SetInfo();
508
					String setSpec = this.mongoSetCollection.normalizeSetSpec(setName);
509
					set.setSetSpec(setSpec);
510
					set.setSetName(setName);
511
					set.setSetDescription("This set contains metadata records whose provenance is " + setName);
512
					set.setEnabled(true);
513
					String query = "(" + OAIConfigurationReader.SET_FIELD + " = \"" + setSpec + "\") ";
514
					set.setQuery(query);
515
					this.mongoSetCollection.upsertSet(set, false, getCollection().getNamespace().getDatabaseName());
516
				}
517
			}
518
		}
519
	}
520

    
521
	private void handleRecord(final String oaiID, final Date lastCollectionDate, final MongoCollection<DBObject> unackCollection) {
522
		log.debug("handling unchanged record " + oaiID);
523
		Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
524
		BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, lastCollectionDate));
525
		unackCollection.updateOne(oldObj, update, new UpdateOptions().upsert(true));
526
	}
527

    
528
	private boolean isNewRecord(final String oaiIdentifier) {
529
		if (alwaysNewRecord || (collection.count() == 0)) return true;
530
		return this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, oaiIdentifier)).first() == null;
531
	}
532

    
533
	// ***********************************************************************************************//
534
	// Setters / Getters / Basic utilities
535
	// ***********************************************************************************************//
536

    
537
	private boolean isChanged(final String oaiID, final String record) {
538
		RecordInfo oldRecord = getRecord(oaiID);
539
		if (oldRecord == null) return StringUtils.isBlank(record);
540
		return this.recordChangeDetector.differs(oldRecord.getMetadata(), record);
541
	}
542

    
543
	private String getOAIIdentifier(final String id) {
544
		return this.idScheme + ":" + this.idNamespace + ":" + id;
545
	}
546

    
547
	@Override
548
	public String toString() {
549
		return "MongoPublisherStore{" +
550
				"id='" + id + '\'' +
551
				", metadataFormat='" + metadataFormat + '\'' +
552
				", interpretation='" + interpretation + '\'' +
553
				", layout='" + layout + '\'' +
554
				", idScheme='" + idScheme + '\'' +
555
				", idNamespace='" + idNamespace + '\'' +
556
				", alwaysNewRecord=" + alwaysNewRecord +
557
				'}';
558
	}
559

    
560
	@Override
561
	public int hashCode() {
562
		final int prime = 31;
563
		int result = 1;
564
		result = (prime * result) + ((collection == null) ? 0 : collection.hashCode());
565
		result = (prime * result) + ((id == null) ? 0 : id.hashCode());
566
		result = (prime * result) + ((interpretation == null) ? 0 : interpretation.hashCode());
567
		result = (prime * result) + ((layout == null) ? 0 : layout.hashCode());
568
		result = (prime * result) + ((metadataFormat == null) ? 0 : metadataFormat.hashCode());
569
		return result;
570
	}
571

    
572
	@Override
573
	public boolean equals(final Object obj) {
574
		if (this == obj) return true;
575
		if (obj == null) return false;
576
		if (!(obj instanceof MongoPublisherStore)) return false;
577
		MongoPublisherStore other = (MongoPublisherStore) obj;
578
		if (collection == null) {
579
			if (other.collection != null) return false;
580
		} else if (!collection.equals(other.collection)) return false;
581
		if (id == null) {
582
			if (other.id != null) return false;
583
		} else if (!id.equals(other.id)) return false;
584
		if (interpretation == null) {
585
			if (other.interpretation != null) return false;
586
		} else if (!interpretation.equals(other.interpretation)) return false;
587
		if (layout == null) {
588
			if (other.layout != null) return false;
589
		} else if (!layout.equals(other.layout)) return false;
590
		if (metadataFormat == null) {
591
			if (other.metadataFormat != null) return false;
592
		} else if (!metadataFormat.equals(other.metadataFormat)) return false;
593
		return true;
594
	}
595

    
596
	public MongoCollection<DBObject> getCollection() {
597
		return collection;
598
	}
599

    
600
	public void setCollection(final MongoCollection<DBObject> collection) {
601
		this.collection = collection;
602
	}
603

    
604
	public MongoCollection<DBObject> getDiscardedCollection() {
605
		return discardedCollection;
606
	}
607

    
608
	public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
609
		this.discardedCollection = discardedCollection;
610
	}
611

    
612
	public String getIdScheme() {
613
		return idScheme;
614
	}
615

    
616
	public void setIdScheme(final String idScheme) {
617
		this.idScheme = idScheme;
618
	}
619

    
620
	public String getIdNamespace() {
621
		return idNamespace;
622
	}
623

    
624
	public void setIdNamespace(final String idNamespace) {
625
		this.idNamespace = idNamespace;
626
	}
627

    
628
	public RecordInfoGenerator getRecordInfoGenerator() {
629
		return recordInfoGenerator;
630
	}
631

    
632
	public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) {
633
		this.recordInfoGenerator = recordInfoGenerator;
634
	}
635

    
636
	public MetadataExtractor getMetadataExtractor() {
637
		return metadataExtractor;
638
	}
639

    
640
	public void setMetadataExtractor(final MetadataExtractor metadataExtractor) {
641
		this.metadataExtractor = metadataExtractor;
642
	}
643

    
644
	public RecordChangeDetector getRecordChangeDetector() {
645
		return recordChangeDetector;
646
	}
647

    
648
	public void setRecordChangeDetector(final RecordChangeDetector recordChangeDetector) {
649
		this.recordChangeDetector = recordChangeDetector;
650
	}
651

    
652
	@Override
653
	public String getId() {
654
		return this.id;
655
	}
656

    
657
	public void setId(final String id) {
658
		this.id = id;
659
	}
660

    
661
	@Override
662
	public String getMetadataFormat() {
663
		return this.metadataFormat;
664
	}
665

    
666
	public void setMetadataFormat(final String metadataFormat) {
667
		this.metadataFormat = metadataFormat;
668
	}
669

    
670
	@Override
671
	public String getInterpretation() {
672
		return this.interpretation;
673
	}
674

    
675
	public void setInterpretation(final String interpretation) {
676
		this.interpretation = interpretation;
677
	}
678

    
679
	@Override
680
	public String getLayout() {
681
		return this.layout;
682
	}
683

    
684
	public void setLayout(final String layout) {
685
		this.layout = layout;
686
	}
687

    
688
	public MongoSetCollection getMongoSetCollection() {
689
		return mongoSetCollection;
690
	}
691

    
692
	public void setMongoSetCollection(final MongoSetCollection mongoSetCollection) {
693
		this.mongoSetCollection = mongoSetCollection;
694
	}
695

    
696
	public List<PublisherField> getMongoFields() {
697
		return mongoFields;
698
	}
699

    
700
	public void setMongoFields(final List<PublisherField> mongoFields) {
701
		this.mongoFields = mongoFields;
702
	}
703

    
704
	public boolean isAlwaysNewRecord() {
705
		return alwaysNewRecord;
706
	}
707

    
708
	public void setAlwaysNewRecord(final boolean alwaysNewRecord) {
709
		this.alwaysNewRecord = alwaysNewRecord;
710
	}
711

    
712
}
(4-4/7)