Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.time.Duration;
4
import java.time.LocalDate;
5
import java.time.ZoneId;
6
import java.util.ArrayList;
7
import java.util.Collections;
8
import java.util.Date;
9
import java.util.List;
10
import java.util.stream.Collectors;
11

    
12
import com.mongodb.BasicDBList;
13
import com.mongodb.BasicDBObject;
14
import com.mongodb.DBObject;
15
import com.mongodb.WriteConcern;
16
import com.mongodb.client.FindIterable;
17
import com.mongodb.client.MongoCollection;
18
import com.mongodb.client.MongoDatabase;
19
import com.mongodb.client.MongoIterable;
20
import com.mongodb.client.model.Filters;
21
import com.mongodb.client.model.UpdateOptions;
22
import eu.dnetlib.data.mdstore.modular.connector.*;
23
import eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreUtils;
24
import eu.dnetlib.enabling.tools.DnetStreamSupport;
25
import eu.dnetlib.rmi.data.MDStoreServiceException;
26
import org.apache.commons.lang3.StringUtils;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.bson.conversions.Bson;
30
import org.springframework.beans.factory.annotation.Required;
31

    
32
/**
33
 * The Class MDStoreTransactionManager.
34
 */
35
public class MDStoreTransactionManagerImpl implements MDStoreTransactionManager {
36

    
37
	/**
38
	 * The Constant log.
39
	 */
40
	private static final Log log = LogFactory.getLog(MDStoreTransactionManagerImpl.class);
41

    
42
	/**
43
	 * The table name.
44
	 */
45
	private static String TABLE_NAME = "metadataManager";
46

    
47
	/**
48
	 * The max number of concurrent transactions per mdstore.
49
	 */
50
	private int maxTransactions = 1;
51

    
52
	/**
53
	 * The db.
54
	 */
55
	private MongoDatabase db;
56

    
57
	/**
58
	 * The manager table.
59
	 */
60
	private MongoCollection<DBObject> managerTable;
61

    
62
	/**
63
	 * The expired days.
64
	 */
65
	private int expiredDays;
66

    
67
	/**
68
	 * Bootstrap manager.
69
	 */
70
	private void bootstrapManager() {
71
		log.debug("Bootstrap Manager start");
72
		final MongoCollection<DBObject> metadataColl = db.getCollection("metadata", DBObject.class);
73
		final FindIterable<DBObject> values = metadataColl.find();
74
		this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
75
		for (DBObject object : values) {
76
			final String id = (String) object.get("mdId");
77
			String newId = id;
78
			if (id.contains("_")) {
79
				newId = StringUtils.substringBefore(id, "_");
80
			}
81
			final BasicDBObject input = new BasicDBObject();
82
			input.put("mdId", id);
83
			input.put("currentId", newId);
84
			input.put("expiring", new String[] {});
85
			input.put("transactions", new String[] {});
86
			getManagerTable().insertOne(input);
87
			log.debug(String.format("Added %s to Metadata Manager data structure", id));
88

    
89
		}
90
		final BasicDBObject ensureIndex = new BasicDBObject();
91
		ensureIndex.put("mdId", 1);
92
		log.debug("Create index in MetadaManager ");
93
		this.getManagerTable().createIndex(ensureIndex);
94
	}
95

    
96
	/**
97
	 * Gets the DBObject describing an mdstore. null if there is no mdstore with the given id.
98
	 *
99
	 * @param mdstoreID
100
	 * @return DBObject or null
101
	 */
102
	private DBObject getMDStoreAsDBObject(String mdstoreID) throws MDStoreServiceException {
103
		final BasicDBObject query = new BasicDBObject();
104
		query.put("mdId", mdstoreID);
105
		final FindIterable<DBObject> it = this.getManagerTable().find(query);
106
		DBObject mdstoreInfo = it.first();
107
		return mdstoreInfo;
108
	}
109

    
110
	/**
111
	 * Verify consistency.
112
	 *
113
	 * @throws MDStoreServiceException the MD store service exception
114
	 */
115
	@Override
116
	public void verifyConsistency() throws MDStoreServiceException {
117
		if (this.getManagerTable() == null) {
118
			final int size = DnetStreamSupport.generateStreamFromIterator(db.listCollectionNames().iterator()).filter(it -> it.contains(TABLE_NAME))
119
					.collect(Collectors.toList()).size();
120
			if (size == 0)
121
				bootstrapManager();
122
			else {
123
				this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
124
			}
125
		}
126
		if (this.getManagerTable() == null) throw new MDStoreServiceException("Something bad happen, unable to create managerTable");
127
	}
128

    
129
	/**
130
	 * {@inheritDoc}
131
	 *
132
	 * @see MDStoreTransactionManager#createMDStore(String)
133
	 */
134
	@Override
135
	public void createMDStore(final String mdId) throws MDStoreServiceException {
136
		log.debug("Creating new mdstore");
137
		verifyConsistency();
138
		String newId = mdId;
139
		if (mdId.contains("_")) {
140
			newId = StringUtils.substringBefore(mdId, "_");
141
		}
142
		final BasicDBObject instance = new BasicDBObject();
143
		instance.put("mdId", mdId);
144
		instance.put("currentId", newId);
145
		instance.put("expiring", new String[] {});
146
		getManagerTable().insertOne(instance);
147
	}
148

    
149
	/**
150
	 * {@inheritDoc}
151
	 *
152
	 * @see MDStoreTransactionManager#dropMDStore(String)
153
	 */
154
	@Override
155
	public void dropMDStore(final String mdId) throws MDStoreServiceException {
156
		verifyConsistency();
157
		log.debug("Droping MDStore: " + mdId);
158
		final BasicDBObject query = new BasicDBObject();
159
		query.put("mdId", mdId);
160
		final DBObject dropped = this.getManagerTable().findOneAndDelete(query);
161
		garbage();
162
		final String collectionName = (String) dropped.get("currentId");
163
		this.db.getCollection(collectionName).drop();
164
		this.db.getCollection("discarded-" + collectionName).drop();
165
	}
166

    
167
	/**
168
	 * {@inheritDoc}
169
	 *
170
	 * @see MDStoreTransactionManager#getMDStoreCollection(String)
171
	 */
172
	@Override
173
	public String getMDStoreCollection(final String mdId) throws MDStoreServiceException {
174
		verifyConsistency();
175
		DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
176
		if (mdstoreInfo != null)
177
			return (String) mdstoreInfo.get("currentId");
178
		else return null;
179
	}
180

    
181
	/**
182
	 * {@inheritDoc}
183
	 *
184
	 * @see MDStoreTransactionManager#startTransaction(String, boolean)
185
	 */
186
	@Override
187
	public String startTransaction(final String mdId, final boolean refresh) throws MDStoreServiceException {
188
		verifyConsistency();
189
		log.info("Start transaction for metadata store " + mdId);
190
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
191
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
192
		String idCreation = StringUtils.substringBefore(mdId, "_");
193
		idCreation = idCreation + "::" + System.currentTimeMillis();
194

    
195
		BasicDBList values;
196
		if (mdstoreInfo.containsField("transactions")) {
197
			values = (BasicDBList) mdstoreInfo.get("transactions");
198
			if (values.size() > getMaxTransactions())
199
				throw new MDStoreServiceException("Cannot create more than " + getMaxTransactions() + " transactions, found: " + values.size() + ", mdId:"
200
						+ mdId);
201
		} else {
202
			values = new BasicDBList();
203
		}
204
		final BasicDBObject transactionMetadata = new BasicDBObject();
205
		transactionMetadata.put("id", idCreation.toString());
206
		transactionMetadata.put("refresh", refresh);
207
		transactionMetadata.put("date", new Date());
208
		values.add(transactionMetadata);
209
		mdstoreInfo.put("transactions", values);
210
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
211
		return idCreation.toString();
212
	}
213

    
214
	/**
215
	 * {@inheritDoc}
216
	 *
217
	 * @see MDStoreTransactionManager#commit(String, String,
218
	 * MDStore)
219
	 */
220
	@Override
221
	public boolean commit(final String transactionId, final String mdstoreId, final MDStore current) throws MDStoreServiceException {
222
		verifyConsistency();
223
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdstoreId);
224
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdstoreId);
225
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
226
		final DBObject transaction = findTransaction(transactions, transactionId);
227
		if (transaction == null) throw new MDStoreServiceException("Error, unable to find transaction with Id " + transactionId);
228
		final boolean refresh = (Boolean) transaction.get("refresh");
229
		transactions.remove(transaction);
230
		final String oldId = (String) mdstoreInfo.get("currentId");
231
		if (refresh) {
232
			mdstoreInfo.put("currentId", transactionId);
233
			final BasicDBList stillUsed = (BasicDBList) mdstoreInfo.get("expiring");
234
			if (stillUsed.size() == 0) {
235
				db.getCollection(oldId).drop();
236
				db.getCollection("discarded-" + oldId).drop();
237
			}
238
			log.debug("Replaced collection ");
239
		} else {
240
			log.debug("commit incremental ");
241
			updateIncremental(transactionId, oldId);
242
			db.getCollection(transactionId).drop();
243
			db.getCollection("discarded-" + transactionId).drop();
244
		}
245
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
246

    
247
		log.info("Committed transaction for metadata store " + mdstoreId);
248
		return true;
249
	}
250

    
251
	/**
252
	 * Find transaction.
253
	 *
254
	 * @param transactions  the transactions
255
	 * @param transactionId the transaction id
256
	 * @return the DB object
257
	 */
258
	private DBObject findTransaction(final BasicDBList transactions, final String transactionId) {
259
		if (transactions.size() == 0) return null;
260
		for (int i = 0; i < transactions.size(); i++) {
261
			final BasicDBObject transaction = (BasicDBObject) transactions.get(i);
262
			final String id = (String) transaction.get("id");
263
			if (transactionId.equals(id)) return transaction;
264
		}
265
		return null;
266

    
267
	}
268

    
269
	/**
270
	 * Gets the db.
271
	 *
272
	 * @return the db
273
	 */
274
	public MongoDatabase getDb() {
275
		return db;
276
	}
277

    
278
	/**
279
	 * Sets the db.
280
	 *
281
	 * @param db the db to set
282
	 */
283
	@Required
284
	public void setDb(final MongoDatabase db) {
285
		this.db = db;
286
	}
287

    
288
	/**
289
	 * {@inheritDoc}
290
	 *
291
	 * @see MDStoreTransactionManager#readMdStore(String)
292
	 */
293
	@Override
294
	public String readMdStore(final String mdStoreId) throws MDStoreServiceException {
295
		verifyConsistency();
296
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
297
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdStoreId);
298
		final String currentId = (String) mdstoreInfo.get("currentId");
299
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
300
		updateMdstoreUsed(values, currentId);
301
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
302
		return currentId;
303

    
304
	}
305

    
306
	/**
307
	 * Update mdstore used.
308
	 *
309
	 * @param values the values
310
	 * @param mdId   the md id
311
	 */
312
	private void updateMdstoreUsed(final BasicDBList values, final String mdId) {
313
		if (values.size() > 0) {
314
			for (int i = 0; i < values.size(); i++) {
315
				final DBObject obj = (DBObject) values.get(i);
316
				final String id = (String) obj.get("id");
317
				if (mdId.equals(id)) {
318
					obj.put("lastRead", new Date());
319
					return;
320
				}
321
			}
322
		}
323
		final BasicDBObject readStore = new BasicDBObject();
324
		readStore.put("id", mdId);
325
		readStore.put("lastRead", new Date());
326
		values.add(readStore);
327
	}
328

    
329
	/**
330
	 * Gets the manager table.
331
	 *
332
	 * @return the managerTable
333
	 */
334
	public MongoCollection<DBObject> getManagerTable() {
335
		return managerTable;
336
	}
337

    
338
	/**
339
	 * Sets the manager table.
340
	 *
341
	 * @param managerTable the managerTable to set
342
	 */
343
	public void setManagerTable(final MongoCollection<DBObject> managerTable) {
344
		this.managerTable = managerTable;
345
	}
346

    
347
	/*
348
	 * (non-Javadoc)
349
	 *
350
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
351
	 */
352
	@Override
353
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdStoreId) throws MDStoreServiceException {
354
		verifyConsistency();
355
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
356
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdStoreId);
357
		final MDStoreManagerInfo result = new MDStoreManagerInfo();
358
		result.setCurrentId((String) mdstoreInfo.get("currentId"));
359
		result.setMdId((String) mdstoreInfo.get("mdId"));
360
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
361
		for (int i = 0; i < values.size(); i++) {
362
			final MDStoreExpiredInfo stillused = new MDStoreExpiredInfo();
363
			final DBObject value = (DBObject) values.get(i);
364
			stillused.setId((String) value.get("id"));
365
			stillused.setLastRead((Date) value.get("lastRead"));
366
			result.addExpiredItem(stillused);
367
		}
368
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
369
		if (transactions != null) {
370
			for (int i = 0; i < transactions.size(); i++) {
371
				final MDStoreTransactionInfo transaction = new MDStoreTransactionInfo();
372
				final DBObject value = (DBObject) transactions.get(i);
373
				final String transactionId = (String) value.get("id");
374
				transaction.setId(transactionId);
375
				transaction.setDate((Date) value.get("date"));
376
				transaction.setRefresh((Boolean) value.get("refresh"));
377
				transaction.setSize(db.getCollection(transactionId).count());
378
				result.addTransactionInfo(transaction);
379
			}
380
		}
381
		return result;
382
	}
383

    
384
	/*
385
	 * (non-Javadoc)
386
	 *
387
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
388
	 */
389
	@Override
390
	public Boolean dropUsed(final String mdId, final String idToDrop) throws MDStoreServiceException {
391
		verifyConsistency();
392
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
393
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
394
		return dropStore(mdstoreInfo, idToDrop, "expiring");
395
	}
396

    
397
	private boolean dropStore(DBObject mdstoreInfo, final String idToDrop, String transactionListName) throws MDStoreServiceException {
398
		final BasicDBList transactionList = (BasicDBList) mdstoreInfo.get(transactionListName);
399
		for (int i = 0; i < transactionList.size(); i++) {
400
			final DBObject value = (DBObject) transactionList.get(i);
401
			final String currentUsedId = (String) value.get("id");
402
			if (currentUsedId.equals(idToDrop)) {
403
				db.getCollection(idToDrop).drop();
404
				db.getCollection("discarded-" + idToDrop).drop();
405
				transactionList.remove(value);
406
				mdstoreInfo.put(transactionListName, transactionList);
407
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
408
				return true;
409
			}
410
		}
411
		throw new MDStoreServiceException("Error, unable to drop collection " + idToDrop);
412
	}
413

    
414
	/*
415
	 * (non-Javadoc)
416
	 *
417
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#garbage()
418
	 */
419
	@Override
420
	public void garbage() throws MDStoreServiceException {
421
		verifyConsistency();
422
		log.info("Start garbage collection of MdStore");
423
		final FindIterable<DBObject> it = this.managerTable.find();
424
		int totalDeleted = 0;
425
		for (DBObject currentObject : it) {
426
			if (log.isDebugEnabled()) {
427
				log.debug("start to check id: " + currentObject.get("currentId"));
428
			}
429
			garbageExpiring(currentObject, (String) currentObject.get("currentId"));
430
			garbageTransactions(currentObject, (String) currentObject.get("currentId"));
431
			this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
432
		}
433

    
434
		// DELETING Collection that are not in the metadataManager table
435
		MongoIterable<String> collections = this.db.listCollectionNames();
436
		for (String collection : collections) {
437
			if ((collection.length() > 30) && (collection.contains("discarded-") == false)) {
438
				final DBObject item = getMetadataObjectForCollections(collection);
439

    
440
				if (shouldDelete(collection, item)) {
441
					if (log.isDebugEnabled()) {
442
						log.debug("delete collection: " + collection + " from mongo");
443
					}
444
					db.getCollection(collection).drop();
445
					db.getCollection("discarded-" + collection).drop();
446
					if (log.isDebugEnabled()) {
447
						log.debug("delete collection: discarded-" + collection + " from mongo");
448
					}
449
				}
450
			}
451
		}
452

    
453
		log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted);
454
	}
455

    
456
	private DBObject getMetadataObjectForCollections(final String collectionName) {
457
		if (collectionName == null) return null;
458
		final String postfix = "_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==";
459
		final String tmp = collectionName.contains("discarded-") == true ? StringUtils.substringAfter(collectionName, "discarded-") : collectionName;
460
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + postfix;
461

    
462
		//DBObject query = QueryBuilder.start("mdId").is(collectionNameCleaned).get();
463
		Bson query = Filters.eq("mdId", collectionNameCleaned);
464
		return this.managerTable.find(query).first();
465

    
466
	}
467

    
468
	private boolean shouldDelete(final String collectionName, final DBObject metadataManagerInstance) {
469
		log.debug("should delete instance " + metadataManagerInstance);
470
		if ((metadataManagerInstance == null) || (metadataManagerInstance.get("currentId") == null)) {
471
			log.debug("the instance has not currentID");
472
			return true;
473
		}
474
		String currentId = (String) metadataManagerInstance.get("currentId");
475
		if (collectionName.equals(currentId)) return false;
476
		BasicDBList expiringList = (BasicDBList) metadataManagerInstance.get("expiring");
477
		if (findInList(expiringList, collectionName, "id") == true) return false;
478
		BasicDBList transactionsList = (BasicDBList) metadataManagerInstance.get("transactions");
479
		return findInList(transactionsList, collectionName, "id") != true;
480
	}
481

    
482
	private boolean findInList(final BasicDBList list, final String object, final String tagname) {
483
		if (list == null) return false;
484
		for (int i = 0; i < list.size(); i++) {
485
			DBObject currentObject = (DBObject) list.get(i);
486
			final String id = (String) currentObject.get(tagname);
487
			if (id.equals(object)) return true;
488
		}
489
		return false;
490
	}
491

    
492
	/**
493
	 * Delete.
494
	 *
495
	 * @param list     the list
496
	 * @param toRemove the to remove
497
	 */
498
	private void delete(final BasicDBList list, final List<DBObject> toRemove) {
499

    
500
		for (final DBObject obj : toRemove) {
501
			if (log.isDebugEnabled()) {
502
				log.debug("deleting " + obj);
503
			}
504
			list.remove(obj);
505
		}
506
	}
507

    
508
	/**
509
	 * Garbage transactions.
510
	 *
511
	 * @param currentObject the current object
512
	 * @param currentId     the current id
513
	 */
514
	private void garbageTransactions(final DBObject currentObject, final String currentId) {
515
		if (log.isDebugEnabled()) {
516
			log.debug("Start garbage transactions ");
517
		}
518

    
519
		final BasicDBList expiring = (BasicDBList) currentObject.get("transactions");
520
		if ((expiring == null) || (expiring.size() <= getMaxTransactions())) return;
521

    
522
		List<DBObject> expiringList = new ArrayList<>();
523

    
524
		for (int i = 0; i < expiring.size(); i++) {
525
			final DBObject cobj = (DBObject) expiring.get(i);
526
			if (cobj != null) {
527
				expiringList.add((DBObject) expiring.get(i));
528
			}
529

    
530
		}
531

    
532
		Collections.sort(expiringList, MDStoreUtils.getComparatorOnDate());
533

    
534
		List<DBObject> toRemove = new ArrayList<>();
535
		int i = 0;
536

    
537
		// We should remove the k item less recent
538
		// where k = numberOftotalTransaction - maxNumberOfTransaction
539
		// k = numberOfItemToRemove
540

    
541
		while (((expiringList.size() - toRemove.size()) > getMaxTransactions()) || (i < expiringList.size())) {
542
			DBObject currentObj = expiringList.get(i++);
543
			String objectId = (String) currentObj.get("id");
544
			if (!objectId.equals(currentId)) {
545
				if (log.isDebugEnabled()) {
546
					log.debug("delete collection: " + objectId + " from mongo");
547
				}
548
				db.getCollection(objectId).drop();
549
				db.getCollection("discarded-" + objectId).drop();
550
				if (log.isDebugEnabled()) {
551
					log.debug("delete collection: discarded-" + objectId + " from mongo");
552
				}
553
				toRemove.add(currentObj);
554
			} else {
555
				if (log.isDebugEnabled()) {
556
					log.debug("Cannot remove transaction " + objectId + " because is the currentId: " + currentId);
557
				}
558
			}
559
		}
560

    
561
		delete(expiring, toRemove);
562
		log.info("Deleted " + toRemove.size() + " transactions, mdStore Id:" + currentObject.get("mdId"));
563
	}
564

    
565
	/**
566
	 * Garbage expiring.
567
	 *
568
	 * @param currentObject the current object
569
	 * @param currentId     the current id
570
	 */
571
	private void garbageExpiring(final DBObject currentObject, final String currentId) {
572
		if (log.isDebugEnabled()) {
573
			log.debug("Start to search expiring mdstores for id: " + currentObject.get("mdId"));
574
		}
575
		final BasicDBList expiring = (BasicDBList) currentObject.get("expiring");
576
		final List<DBObject> toRemove = new ArrayList<>();
577
		if (log.isDebugEnabled()) {
578
			if (expiring == null) {
579
				log.debug("expiring list is null");
580
			} else {
581
				log.debug("expiring list size is :" + expiring.size());
582
			}
583
		}
584
		if ((expiring == null) || (expiring.size() == 0)) {
585
			log.debug("Deleted  0  expired  collections, mdStore Id:" + currentObject.get("mdId"));
586
			return;
587
		}
588
		for (int i = 0; i < expiring.size(); i++) {
589
			final DBObject currentExpiredStore = (DBObject) expiring.get(i);
590
			final String currentUsedId = (String) currentExpiredStore.get("id");
591
			final long d = getExpiringDays(currentExpiredStore, "lastRead");
592
			if (log.isDebugEnabled()) {
593
				log.debug("the store :" + currentId + " expired since " + d + "days ");
594
			}
595
			// DELETE the collection where the last time they was read
596
			// is more than 3 days ago
597
			if (d > getExpiredDays()) {
598
				if (currentUsedId.equals(currentId) == false) {
599
					db.getCollection(currentUsedId).drop();
600
					db.getCollection("discarded-" + currentUsedId).drop();
601
					log.debug("deleted collection " + currentUsedId);
602
				}
603
				toRemove.add(currentExpiredStore);
604
			}
605
		}
606
		delete(expiring, toRemove);
607
		log.debug("Deleted expired " + toRemove.size() + "collections, mdStore Id:" + currentObject.get("mdId"));
608
	}
609

    
610
	/**
611
	 * Gets the expiring days.
612
	 *
613
	 * @param value     the value
614
	 * @param paramName the param name
615
	 * @return the expiring days
616
	 */
617
	private long getExpiringDays(final DBObject value, final String paramName) {
618

    
619
		final Date lastRead = (Date) value.get(paramName);
620

    
621
		final LocalDate readDate = lastRead.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
622
		return Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays();
623
	}
624

    
625
	/**
626
	 * Gets the expired days.
627
	 *
628
	 * @return the expiredDays
629
	 */
630
	public int getExpiredDays() {
631
		if (this.expiredDays == 0) return 3;
632
		return expiredDays;
633
	}
634

    
635
	/**
636
	 * Sets the expired days.
637
	 *
638
	 * @param expiredDays the expiredDays to set
639
	 */
640
	public void setExpiredDays(final int expiredDays) {
641
		this.expiredDays = expiredDays;
642
	}
643

    
644
	/**
645
	 * Update incremental.
646
	 *
647
	 * @param transactionId the transaction id
648
	 * @param currentId     the current id
649
	 */
650
	private void updateIncremental(final String transactionId, final String currentId) {
651
		final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class);
652
		final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class);
653
		final FindIterable<DBObject> it = transaction.find();
654
		for (DBObject currentObj : it) {
655

    
656
			BasicDBObject newObj = new BasicDBObject();
657

    
658
			final String id = (String) currentObj.get("id");
659
			final String body = (String) currentObj.get("body");
660
			newObj.put("id", id);
661
			newObj.put("body", body);
662
			if (StringUtils.isNotBlank(id)) {
663
				//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
664
				//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
665
				final MongoCollection<DBObject> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
666
				mdstoreWrite.replaceOne(new BasicDBObject("id", id), newObj, new UpdateOptions().upsert(true));
667
			}
668
		}
669
	}
670

    
671
	/*
672
	 * (non-Javadoc)
673
	 *
674
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
675
	 */
676
	@Override
677
	public Boolean dropTransaction(final String mdId, final String idToDrop) throws MDStoreServiceException {
678
		verifyConsistency();
679
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
680
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
681
		return dropStore(mdstoreInfo, idToDrop, "transactions");
682
	}
683

    
684
	public void garbageTransactionsOnStart() throws MDStoreServiceException {
685
		verifyConsistency();
686
		FindIterable<DBObject> it = this.managerTable.find();
687
		for (DBObject currentObject : it) {
688
			final BasicDBList transactions = (BasicDBList) currentObject.get("transactions");
689
			if ((transactions != null) && (transactions.size() > 0)) {
690
				for (int i = 0; i < transactions.size(); i++) {
691
					final DBObject currentTransactions = (DBObject) transactions.get(i);
692
					final String id = (String) currentTransactions.get("id");
693
					db.getCollection(id).drop();
694
					db.getCollection("discarded-" + id).drop();
695
					log.debug("deleted collection " + id);
696
				}
697
				currentObject.put("transactions", new BasicDBList());
698
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
699
			}
700
		}
701
	}
702

    
703
	/**
704
	 * Gets the max transactions.
705
	 *
706
	 * @return the maxTransactions
707
	 */
708
	public int getMaxTransactions() {
709
		return maxTransactions;
710
	}
711

    
712
	/**
713
	 * Sets the max transactions.
714
	 *
715
	 * @param maxTransactions the maxTransactions to set
716
	 */
717
	public void setMaxTransactions(final int maxTransactions) {
718
		this.maxTransactions = maxTransactions;
719
	}
720
}
(2-2/6)