Project

General

Profile

1 28055 sandro.lab
package eu.dnetlib.data.mdstore.modular.mongodb;
2
3 42497 sandro.lab
import java.util.ArrayList;
4 33589 sandro.lab
import java.util.Date;
5
import java.util.List;
6
7 34000 sandro.lab
import com.google.common.collect.Lists;
8 33589 sandro.lab
import com.mongodb.BasicDBList;
9
import com.mongodb.BasicDBObject;
10
import com.mongodb.DBObject;
11
import com.mongodb.WriteConcern;
12 41551 alessia.ba
import com.mongodb.client.FindIterable;
13
import com.mongodb.client.MongoCollection;
14
import com.mongodb.client.MongoDatabase;
15
import com.mongodb.client.MongoIterable;
16
import com.mongodb.client.model.Filters;
17 47863 claudio.at
import com.mongodb.client.model.IndexOptions;
18 41551 alessia.ba
import com.mongodb.client.model.UpdateOptions;
19 33589 sandro.lab
import eu.dnetlib.data.mdstore.MDStoreServiceException;
20 41551 alessia.ba
import eu.dnetlib.data.mdstore.modular.connector.*;
21 34000 sandro.lab
import eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreUtils;
22 50676 claudio.at
import org.apache.commons.lang3.StringUtils;
23 41551 alessia.ba
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.bson.conversions.Bson;
26
import org.joda.time.DateTime;
27
import org.joda.time.Days;
28
import org.springframework.beans.factory.annotation.Required;
29 28153 sandro.lab
30 28055 sandro.lab
/**
31
 * The Class MDStoreTransactionManager.
32
 */
33
public class MDStoreTransactionManagerImpl implements MDStoreTransactionManager {
34
35 34000 sandro.lab
	/** The Constant log. */
36 33589 sandro.lab
	private static final Log log = LogFactory.getLog(MDStoreTransactionManagerImpl.class);
37 28174 sandro.lab
38 33589 sandro.lab
	/**
39
	 * The table name.
40
	 */
41
	private static String TABLE_NAME = "metadataManager";
42 28055 sandro.lab
43 34000 sandro.lab
	/** The max number of concurrent transactions per mdstore. */
44
	private int maxTransactions = 1;
45
46 33589 sandro.lab
	/**
47
	 * The db.
48
	 */
49 41551 alessia.ba
	private MongoDatabase db;
50 28055 sandro.lab
51 33589 sandro.lab
	/**
52
	 * The manager table.
53
	 */
54 41551 alessia.ba
	private MongoCollection<DBObject> managerTable;
55 28055 sandro.lab
56 34000 sandro.lab
	/** The expired days. */
57 33589 sandro.lab
	private int expiredDays;
58 28055 sandro.lab
59 47863 claudio.at
	private final IndexOptions options = new IndexOptions().background(true);
60
61 33589 sandro.lab
	/**
62
	 * Bootstrap manager.
63
	 */
64
	private void bootstrapManager() {
65
		log.debug("Bootstrap Manager start");
66 41551 alessia.ba
		final MongoCollection<DBObject> metadataColl = db.getCollection("metadata", DBObject.class);
67
		final FindIterable<DBObject> values = metadataColl.find();
68
		this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
69
		for (DBObject object : values) {
70 33792 claudio.at
			final String id = (String) object.get("mdId");
71 41551 alessia.ba
			String newId = id;
72 33589 sandro.lab
			if (id.contains("_")) {
73
				newId = StringUtils.substringBefore(id, "_");
74
			}
75 33792 claudio.at
			final BasicDBObject input = new BasicDBObject();
76 33589 sandro.lab
			input.put("mdId", id);
77
			input.put("currentId", newId);
78
			input.put("expiring", new String[] {});
79
			input.put("transactions", new String[] {});
80 41551 alessia.ba
			getManagerTable().insertOne(input);
81 33589 sandro.lab
			log.debug(String.format("Added %s to Metadata Manager data structure", id));
82 28055 sandro.lab
83 33589 sandro.lab
		}
84 33792 claudio.at
		final BasicDBObject ensureIndex = new BasicDBObject();
85 33589 sandro.lab
		ensureIndex.put("mdId", 1);
86
		log.debug("Create index in MetadaManager ");
87 47863 claudio.at
		this.getManagerTable().createIndex(ensureIndex, options);
88 33589 sandro.lab
	}
89 28055 sandro.lab
90 33589 sandro.lab
	/**
91 41551 alessia.ba
	 * Gets the DBObject describing an mdstore. null if there is no mdstore with the given id.
92
	 *
93 50676 claudio.at
	 * @param mdstoreID the mdStore identifier
94 41551 alessia.ba
	 * @return DBObject or null
95
	 */
96 50676 claudio.at
	private DBObject getMDStoreAsDBObject(final String mdstoreID) throws MDStoreServiceException {
97 41551 alessia.ba
		final BasicDBObject query = new BasicDBObject();
98
		query.put("mdId", mdstoreID);
99
		final FindIterable<DBObject> it = this.getManagerTable().find(query);
100 50676 claudio.at
		return it.first();
101 41551 alessia.ba
	}
102
103
	/**
104 33589 sandro.lab
	 * Verify consistency.
105 36324 sandro.lab
	 *
106 33589 sandro.lab
	 * @throws MDStoreServiceException
107
	 *             the MD store service exception
108
	 */
109
	@Override
110
	public void verifyConsistency() throws MDStoreServiceException {
111
		if (this.getManagerTable() == null) {
112 41551 alessia.ba
			if (!Lists.newArrayList(db.listCollectionNames()).contains(TABLE_NAME))
113 33589 sandro.lab
				bootstrapManager();
114 41551 alessia.ba
			else {
115
				this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
116 33589 sandro.lab
			}
117
		}
118 41551 alessia.ba
		if (this.getManagerTable() == null) throw new MDStoreServiceException("Something bad happen, unable to create managerTable");
119 33589 sandro.lab
	}
120 28055 sandro.lab
121 33589 sandro.lab
	/**
122
	 * {@inheritDoc}
123 36324 sandro.lab
	 *
124 33589 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#createMDStore(java.lang.String)
125
	 */
126
	@Override
127
	public void createMDStore(final String mdId) throws MDStoreServiceException {
128
		log.debug("Creating new mdstore");
129
		verifyConsistency();
130
		String newId = mdId;
131
		if (mdId.contains("_")) {
132
			newId = StringUtils.substringBefore(mdId, "_");
133
		}
134 33792 claudio.at
		final BasicDBObject instance = new BasicDBObject();
135 33589 sandro.lab
		instance.put("mdId", mdId);
136
		instance.put("currentId", newId);
137
		instance.put("expiring", new String[] {});
138 41551 alessia.ba
		getManagerTable().insertOne(instance);
139 33589 sandro.lab
	}
140 28055 sandro.lab
141 33589 sandro.lab
	/**
142
	 * {@inheritDoc}
143 36324 sandro.lab
	 *
144 33589 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropMDStore(java.lang.String)
145
	 */
146
	@Override
147
	public void dropMDStore(final String mdId) throws MDStoreServiceException {
148
		verifyConsistency();
149
		log.debug("Droping MDStore: " + mdId);
150 33792 claudio.at
		final BasicDBObject query = new BasicDBObject();
151 33589 sandro.lab
		query.put("mdId", mdId);
152 41551 alessia.ba
		final DBObject dropped = this.getManagerTable().findOneAndDelete(query);
153 33589 sandro.lab
		garbage();
154 41551 alessia.ba
		final String collectionName = (String) dropped.get("currentId");
155 33589 sandro.lab
		this.db.getCollection(collectionName).drop();
156
		this.db.getCollection("discarded-" + collectionName).drop();
157
	}
158 28055 sandro.lab
159 33589 sandro.lab
	/**
160
	 * {@inheritDoc}
161 36324 sandro.lab
	 *
162 33589 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getMDStoreCollection(java.lang.String)
163
	 */
164
	@Override
165
	public String getMDStoreCollection(final String mdId) throws MDStoreServiceException {
166
		verifyConsistency();
167 41551 alessia.ba
		DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
168
		if (mdstoreInfo != null)
169
			return (String) mdstoreInfo.get("currentId");
170
		else return null;
171 33589 sandro.lab
	}
172 28055 sandro.lab
173 33589 sandro.lab
	/**
174
	 * {@inheritDoc}
175 36324 sandro.lab
	 *
176 33589 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#startTransaction(java.lang.String, boolean)
177
	 */
178
	@Override
179
	public String startTransaction(final String mdId, final boolean refresh) throws MDStoreServiceException {
180
		verifyConsistency();
181
		log.info("Start transaction for metadata store " + mdId);
182 41551 alessia.ba
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
183
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
184 33589 sandro.lab
		String idCreation = StringUtils.substringBefore(mdId, "_");
185
		idCreation = idCreation + "::" + System.currentTimeMillis();
186 34000 sandro.lab
187 33589 sandro.lab
		BasicDBList values;
188 41551 alessia.ba
		if (mdstoreInfo.containsField("transactions")) {
189
			values = (BasicDBList) mdstoreInfo.get("transactions");
190 34605 alessia.ba
			if (values.size() > getMaxTransactions())
191
				throw new MDStoreServiceException("Cannot create more than " + getMaxTransactions() + " transactions, found: " + values.size() + ", mdId:"
192
						+ mdId);
193 33589 sandro.lab
		} else {
194
			values = new BasicDBList();
195
		}
196 34000 sandro.lab
		final BasicDBObject transactionMetadata = new BasicDBObject();
197 50676 claudio.at
		transactionMetadata.put("id", idCreation);
198 34000 sandro.lab
		transactionMetadata.put("refresh", refresh);
199
		transactionMetadata.put("date", new Date());
200 33589 sandro.lab
		values.add(transactionMetadata);
201 41551 alessia.ba
		mdstoreInfo.put("transactions", values);
202
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
203 50676 claudio.at
		return idCreation;
204 33589 sandro.lab
	}
205 28055 sandro.lab
206 33589 sandro.lab
	/**
207
	 * {@inheritDoc}
208 36324 sandro.lab
	 *
209 33589 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#commit(java.lang.String, java.lang.String,
210
	 *      eu.dnetlib.data.mdstore.modular.connector.MDStore)
211
	 */
212
	@Override
213
	public boolean commit(final String transactionId, final String mdstoreId, final MDStore current) throws MDStoreServiceException {
214
		verifyConsistency();
215 41551 alessia.ba
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdstoreId);
216
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdstoreId);
217
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
218 33792 claudio.at
		final DBObject transaction = findTransaction(transactions, transactionId);
219 34605 alessia.ba
		if (transaction == null) throw new MDStoreServiceException("Error, unable to find transaction with Id " + transactionId);
220 33792 claudio.at
		final boolean refresh = (Boolean) transaction.get("refresh");
221 33589 sandro.lab
		transactions.remove(transaction);
222 41551 alessia.ba
		final String oldId = (String) mdstoreInfo.get("currentId");
223 33589 sandro.lab
		if (refresh) {
224 41551 alessia.ba
			mdstoreInfo.put("currentId", transactionId);
225
			final BasicDBList stillUsed = (BasicDBList) mdstoreInfo.get("expiring");
226 33589 sandro.lab
			if (stillUsed.size() == 0) {
227
				db.getCollection(oldId).drop();
228
				db.getCollection("discarded-" + oldId).drop();
229
			}
230
			log.debug("Replaced collection ");
231
		} else {
232
			log.debug("commit incremental ");
233 33620 claudio.at
			updateIncremental(transactionId, oldId);
234 33626 sandro.lab
			db.getCollection(transactionId).drop();
235
			db.getCollection("discarded-" + transactionId).drop();
236 33589 sandro.lab
		}
237 41551 alessia.ba
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
238 28153 sandro.lab
239 33589 sandro.lab
		log.info("Committed transaction for metadata store " + mdstoreId);
240
		return true;
241
	}
242 28174 sandro.lab
243 34000 sandro.lab
	/**
244
	 * Find transaction.
245 36324 sandro.lab
	 *
246 34000 sandro.lab
	 * @param transactions
247
	 *            the transactions
248
	 * @param transactionId
249
	 *            the transaction id
250
	 * @return the DB object
251
	 */
252 33589 sandro.lab
	private DBObject findTransaction(final BasicDBList transactions, final String transactionId) {
253 34605 alessia.ba
		if (transactions.size() == 0) return null;
254 50676 claudio.at
		for (Object tx : transactions) {
255
			final BasicDBObject transaction = (BasicDBObject) tx;
256 33792 claudio.at
			final String id = (String) transaction.get("id");
257 34605 alessia.ba
			if (transactionId.equals(id)) return transaction;
258 33589 sandro.lab
		}
259
		return null;
260 28055 sandro.lab
261 33589 sandro.lab
	}
262 28055 sandro.lab
263 33589 sandro.lab
	/**
264
	 * Gets the db.
265 36324 sandro.lab
	 *
266 33589 sandro.lab
	 * @return the db
267
	 */
268 41551 alessia.ba
	public MongoDatabase getDb() {
269 33589 sandro.lab
		return db;
270
	}
271 28055 sandro.lab
272 33589 sandro.lab
	/**
273
	 * Sets the db.
274 36324 sandro.lab
	 *
275 33589 sandro.lab
	 * @param db
276
	 *            the db to set
277
	 */
278
	@Required
279 41551 alessia.ba
	public void setDb(final MongoDatabase db) {
280 33589 sandro.lab
		this.db = db;
281
	}
282 28055 sandro.lab
283 33589 sandro.lab
	/**
284
	 * {@inheritDoc}
285 36324 sandro.lab
	 *
286 33589 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#readMdStore(java.lang.String)
287
	 */
288
	@Override
289
	public String readMdStore(final String mdStoreId) throws MDStoreServiceException {
290
		verifyConsistency();
291 41551 alessia.ba
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
292
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdStoreId);
293
		final String currentId = (String) mdstoreInfo.get("currentId");
294
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
295 33589 sandro.lab
		updateMdstoreUsed(values, currentId);
296 41551 alessia.ba
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
297 33589 sandro.lab
		return currentId;
298 28055 sandro.lab
299 33589 sandro.lab
	}
300 28055 sandro.lab
301 41551 alessia.ba
302 34000 sandro.lab
	/**
303
	 * Update mdstore used.
304 36324 sandro.lab
	 *
305 34000 sandro.lab
	 * @param values
306
	 *            the values
307
	 * @param mdId
308
	 *            the md id
309
	 */
310 33589 sandro.lab
	private void updateMdstoreUsed(final BasicDBList values, final String mdId) {
311
		if (values.size() > 0) {
312 50676 claudio.at
			for (Object value : values) {
313
				final DBObject obj = (DBObject) value;
314 33792 claudio.at
				final String id = (String) obj.get("id");
315 33589 sandro.lab
				if (mdId.equals(id)) {
316
					obj.put("lastRead", new Date());
317
					return;
318
				}
319
			}
320
		}
321 33792 claudio.at
		final BasicDBObject readStore = new BasicDBObject();
322 33589 sandro.lab
		readStore.put("id", mdId);
323
		readStore.put("lastRead", new Date());
324
		values.add(readStore);
325
	}
326 28055 sandro.lab
327 33589 sandro.lab
	/**
328 34000 sandro.lab
	 * Gets the manager table.
329 36324 sandro.lab
	 *
330 33589 sandro.lab
	 * @return the managerTable
331
	 */
332 41551 alessia.ba
	public MongoCollection<DBObject> getManagerTable() {
333 33589 sandro.lab
		return managerTable;
334
	}
335 28055 sandro.lab
336 33589 sandro.lab
	/**
337 34000 sandro.lab
	 * Sets the manager table.
338 36324 sandro.lab
	 *
339 33589 sandro.lab
	 * @param managerTable
340
	 *            the managerTable to set
341
	 */
342 41551 alessia.ba
	public void setManagerTable(final MongoCollection<DBObject> managerTable) {
343 33589 sandro.lab
		this.managerTable = managerTable;
344
	}
345 28055 sandro.lab
346 34000 sandro.lab
	/*
347
	 * (non-Javadoc)
348 36324 sandro.lab
	 *
349 34000 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
350
	 */
351 33589 sandro.lab
	@Override
352
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdStoreId) throws MDStoreServiceException {
353
		verifyConsistency();
354 41551 alessia.ba
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
355
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdStoreId);
356 33792 claudio.at
		final MDStoreManagerInfo result = new MDStoreManagerInfo();
357 41551 alessia.ba
		result.setCurrentId((String) mdstoreInfo.get("currentId"));
358
		result.setMdId((String) mdstoreInfo.get("mdId"));
359
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
360 50676 claudio.at
		for (Object v : values) {
361 33792 claudio.at
			final MDStoreExpiredInfo stillused = new MDStoreExpiredInfo();
362 50676 claudio.at
			final DBObject value = (DBObject) v;
363 33589 sandro.lab
			stillused.setId((String) value.get("id"));
364
			stillused.setLastRead((Date) value.get("lastRead"));
365
			result.addExpiredItem(stillused);
366
		}
367 41551 alessia.ba
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
368 33589 sandro.lab
		if (transactions != null) {
369 50676 claudio.at
			for (Object tx : transactions) {
370 33792 claudio.at
				final MDStoreTransactionInfo transaction = new MDStoreTransactionInfo();
371 50676 claudio.at
				final DBObject value = (DBObject) tx;
372 33792 claudio.at
				final String transactionId = (String) value.get("id");
373 33589 sandro.lab
				transaction.setId(transactionId);
374
				transaction.setDate((Date) value.get("date"));
375
				transaction.setRefresh((Boolean) value.get("refresh"));
376
				transaction.setSize(db.getCollection(transactionId).count());
377
				result.addTransactionInfo(transaction);
378
			}
379
		}
380
		return result;
381
	}
382 28055 sandro.lab
383 34000 sandro.lab
	/*
384
	 * (non-Javadoc)
385 36324 sandro.lab
	 *
386 34000 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
387
	 */
388 33589 sandro.lab
	@Override
389
	public Boolean dropUsed(final String mdId, final String idToDrop) throws MDStoreServiceException {
390
		verifyConsistency();
391 41551 alessia.ba
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
392
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
393
		return dropStore(mdstoreInfo, idToDrop, "expiring");
394
	}
395
396
	private boolean dropStore(DBObject mdstoreInfo, final String idToDrop, String transactionListName) throws MDStoreServiceException {
397
		final BasicDBList transactionList = (BasicDBList) mdstoreInfo.get(transactionListName);
398
		for (int i = 0; i < transactionList.size(); i++) {
399
			final DBObject value = (DBObject) transactionList.get(i);
400 33792 claudio.at
			final String currentUsedId = (String) value.get("id");
401 33589 sandro.lab
			if (currentUsedId.equals(idToDrop)) {
402
				db.getCollection(idToDrop).drop();
403
				db.getCollection("discarded-" + idToDrop).drop();
404 41551 alessia.ba
				transactionList.remove(value);
405
				mdstoreInfo.put(transactionListName, transactionList);
406
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
407 33589 sandro.lab
				return true;
408
			}
409
		}
410 41551 alessia.ba
		throw new MDStoreServiceException("Error, unable to drop collection " + idToDrop);
411 33589 sandro.lab
	}
412 28055 sandro.lab
413 34000 sandro.lab
	/*
414
	 * (non-Javadoc)
415 36324 sandro.lab
	 *
416 34000 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#garbage()
417
	 */
418 33589 sandro.lab
	@Override
419
	public void garbage() throws MDStoreServiceException {
420
		verifyConsistency();
421
		log.info("Start garbage collection of MdStore");
422 41551 alessia.ba
		final FindIterable<DBObject> it = this.managerTable.find();
423 33589 sandro.lab
		int totalDeleted = 0;
424 41551 alessia.ba
		for (DBObject currentObject :  it){
425 34000 sandro.lab
			if (log.isDebugEnabled()) {
426 41551 alessia.ba
				log.debug("start to check id: " + currentObject.get("currentId"));
427 34000 sandro.lab
			}
428
			garbageExpiring(currentObject, (String) currentObject.get("currentId"));
429
			garbageTransactions(currentObject, (String) currentObject.get("currentId"));
430 41551 alessia.ba
			this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
431 34000 sandro.lab
		}
432
433 34031 sandro.lab
		// DELETING Collection that are not in the metadataManager table
434 41551 alessia.ba
		MongoIterable<String> collections = this.db.listCollectionNames();
435 34031 sandro.lab
		for (String collection : collections) {
436 50676 claudio.at
			if ((collection.length() > 30) && (!collection.contains("discarded-"))) {
437 34031 sandro.lab
				final DBObject item = getMetadataObjectForCollections(collection);
438 34033 sandro.lab
439 34031 sandro.lab
				if (shouldDelete(collection, item)) {
440
					if (log.isDebugEnabled()) {
441
						log.debug("delete collection: " + collection + " from mongo");
442
					}
443
					db.getCollection(collection).drop();
444
					db.getCollection("discarded-" + collection).drop();
445
					if (log.isDebugEnabled()) {
446
						log.debug("delete collection: discarded-" + collection + " from mongo");
447
					}
448
				}
449
			}
450
		}
451
452 34000 sandro.lab
		log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted);
453
	}
454
455 34031 sandro.lab
	private DBObject getMetadataObjectForCollections(final String collectionName) {
456 34605 alessia.ba
		if (collectionName == null) return null;
457 34031 sandro.lab
		final String postfix = "_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==";
458 50676 claudio.at
		final String tmp = collectionName.contains("discarded-") ? StringUtils.substringAfter(collectionName, "discarded-") : collectionName;
459 34031 sandro.lab
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + postfix;
460
461 41551 alessia.ba
		//DBObject query = QueryBuilder.start("mdId").is(collectionNameCleaned).get();
462
		Bson query = Filters.eq("mdId", collectionNameCleaned);
463
		return this.managerTable.find(query).first();
464
465 34031 sandro.lab
	}
466
467
	private boolean shouldDelete(final String collectionName, final DBObject metadataManagerInstance) {
468 34943 sandro.lab
		log.debug("should delete instance "+metadataManagerInstance);
469 36324 sandro.lab
		if((metadataManagerInstance== null) || (metadataManagerInstance.get("currentId")== null)){
470 34943 sandro.lab
			log.debug("the instance has not currentID");
471
			return true;
472
		}
473 34031 sandro.lab
		String currentId = (String) metadataManagerInstance.get("currentId");
474 34605 alessia.ba
		if (collectionName.equals(currentId)) return false;
475 34031 sandro.lab
		BasicDBList expiringList = (BasicDBList) metadataManagerInstance.get("expiring");
476 50676 claudio.at
		if (findInList(expiringList, collectionName, "id")) return false;
477 34031 sandro.lab
		BasicDBList transactionsList = (BasicDBList) metadataManagerInstance.get("transactions");
478 50676 claudio.at
		return !findInList(transactionsList, collectionName, "id");
479 34031 sandro.lab
	}
480
481
	private boolean findInList(final BasicDBList list, final String object, final String tagname) {
482 34605 alessia.ba
		if (list == null) return false;
483 50676 claudio.at
		for (Object o : list) {
484
			DBObject currentObject = (DBObject) o;
485 34031 sandro.lab
			final String id = (String) currentObject.get(tagname);
486 34605 alessia.ba
			if (id.equals(object)) return true;
487 34031 sandro.lab
		}
488
		return false;
489
	}
490
491 34000 sandro.lab
	/**
492
	 * Delete.
493 36324 sandro.lab
	 *
494 34000 sandro.lab
	 * @param list
495
	 *            the list
496
	 * @param toRemove
497
	 *            the to remove
498
	 */
499
	private void delete(final BasicDBList list, final List<DBObject> toRemove) {
500
501
		for (final DBObject obj : toRemove) {
502
			if (log.isDebugEnabled()) {
503
				log.debug("deleting " + obj);
504
			}
505
			list.remove(obj);
506
		}
507
	}
508
509
	/**
510
	 * Garbage transactions.
511 36324 sandro.lab
	 *
512 34000 sandro.lab
	 * @param currentObject
513
	 *            the current object
514
	 * @param currentId
515
	 *            the current id
516
	 */
517
	private void garbageTransactions(final DBObject currentObject, final String currentId) {
518
		if (log.isDebugEnabled()) {
519
			log.debug("Start garbage transactions ");
520
		}
521
522
		final BasicDBList expiring = (BasicDBList) currentObject.get("transactions");
523 34605 alessia.ba
		if ((expiring == null) || (expiring.size() <= getMaxTransactions())) return;
524 34000 sandro.lab
525
		List<DBObject> expiringList = Lists.newArrayList();
526
527 50676 claudio.at
		for (Object o : expiring) {
528
			final DBObject cobj = (DBObject) o;
529 34000 sandro.lab
			if (cobj != null) {
530 50676 claudio.at
				expiringList.add((DBObject) o);
531 34000 sandro.lab
			}
532
533
		}
534
535 50676 claudio.at
		expiringList.sort(MDStoreUtils.getComparatorOnDate());
536 34000 sandro.lab
537
		List<DBObject> toRemove = Lists.newArrayList();
538
		int i = 0;
539
540
		// We should remove the k item less recent
541
		// where k = numberOftotalTransaction - maxNumberOfTransaction
542
		// k = numberOfItemToRemove
543
544 34605 alessia.ba
		while (((expiringList.size() - toRemove.size()) > getMaxTransactions()) || (i < expiringList.size())) {
545 34000 sandro.lab
			DBObject currentObj = expiringList.get(i++);
546
			String objectId = (String) currentObj.get("id");
547
			if (!objectId.equals(currentId)) {
548
				if (log.isDebugEnabled()) {
549
					log.debug("delete collection: " + objectId + " from mongo");
550 33589 sandro.lab
				}
551 34000 sandro.lab
				db.getCollection(objectId).drop();
552
				db.getCollection("discarded-" + objectId).drop();
553
				if (log.isDebugEnabled()) {
554
					log.debug("delete collection: discarded-" + objectId + " from mongo");
555
				}
556
				toRemove.add(currentObj);
557
			} else {
558
				if (log.isDebugEnabled()) {
559
					log.debug("Cannot remove transaction " + objectId + " because is the currentId: " + currentId);
560
				}
561
			}
562
		}
563 28055 sandro.lab
564 34000 sandro.lab
		delete(expiring, toRemove);
565
		log.info("Deleted " + toRemove.size() + " transactions, mdStore Id:" + currentObject.get("mdId"));
566
	}
567
568
	/**
569
	 * Garbage expiring.
570 36324 sandro.lab
	 *
571 34000 sandro.lab
	 * @param currentObject
572
	 *            the current object
573
	 * @param currentId
574
	 *            the current id
575
	 */
576
	private void garbageExpiring(final DBObject currentObject, final String currentId) {
577
		if (log.isDebugEnabled()) {
578
			log.debug("Start to search expiring mdstores for id: " + currentObject.get("mdId"));
579
		}
580
		final BasicDBList expiring = (BasicDBList) currentObject.get("expiring");
581
		final List<DBObject> toRemove = Lists.newArrayList();
582
		if (log.isDebugEnabled()) {
583
			if (expiring == null) {
584
				log.debug("expiring list is null");
585
			} else {
586
				log.debug("expiring list size is :" + expiring.size());
587 33589 sandro.lab
			}
588 34000 sandro.lab
		}
589 34605 alessia.ba
		if ((expiring == null) || (expiring.size() == 0)) {
590
			log.debug("Deleted  0  expired  collections, mdStore Id:" + currentObject.get("mdId"));
591 34000 sandro.lab
			return;
592
		}
593 50676 claudio.at
		for (Object anExpiring : expiring) {
594
			final DBObject currentExpiredStore = (DBObject) anExpiring;
595 34000 sandro.lab
			final String currentUsedId = (String) currentExpiredStore.get("id");
596
			final Days d = getExpiringDays(currentExpiredStore, "lastRead");
597
			if (log.isDebugEnabled()) {
598
				log.debug("the store :" + currentId + " expired since " + d.getDays() + "days ");
599 33589 sandro.lab
			}
600 34000 sandro.lab
			// DELETE the collection where the last time they was read
601
			// is more than 3 days ago
602
			if (d.getDays() > getExpiredDays()) {
603 50676 claudio.at
				if (!currentUsedId.equals(currentId)) {
604 34000 sandro.lab
					db.getCollection(currentUsedId).drop();
605
					db.getCollection("discarded-" + currentUsedId).drop();
606
					log.debug("deleted collection " + currentUsedId);
607
				}
608
				toRemove.add(currentExpiredStore);
609
			}
610 33589 sandro.lab
		}
611 34000 sandro.lab
		delete(expiring, toRemove);
612 34605 alessia.ba
		log.debug("Deleted expired " + toRemove.size() + "collections, mdStore Id:" + currentObject.get("mdId"));
613 34000 sandro.lab
	}
614 29274 sandro.lab
615 34000 sandro.lab
	/**
616
	 * Gets the expiring days.
617 36324 sandro.lab
	 *
618 34000 sandro.lab
	 * @param value
619
	 *            the value
620
	 * @param paramName
621
	 *            the param name
622
	 * @return the expiring days
623
	 */
624
	private Days getExpiringDays(final DBObject value, final String paramName) {
625
		final Date lastRead = (Date) value.get(paramName);
626
		final DateTime last = new DateTime(lastRead);
627
		final DateTime today = new DateTime();
628
		final Days d = Days.daysBetween(last, today);
629
		return d;
630 33589 sandro.lab
	}
631 29274 sandro.lab
632 33589 sandro.lab
	/**
633 34000 sandro.lab
	 * Gets the expired days.
634 36324 sandro.lab
	 *
635 33589 sandro.lab
	 * @return the expiredDays
636
	 */
637
	public int getExpiredDays() {
638 34605 alessia.ba
		if (this.expiredDays == 0) return 3;
639 33589 sandro.lab
		return expiredDays;
640
	}
641 28055 sandro.lab
642 33589 sandro.lab
	/**
643 34000 sandro.lab
	 * Sets the expired days.
644 36324 sandro.lab
	 *
645 33589 sandro.lab
	 * @param expiredDays
646
	 *            the expiredDays to set
647
	 */
648
	public void setExpiredDays(final int expiredDays) {
649
		this.expiredDays = expiredDays;
650
	}
651 28055 sandro.lab
652 34000 sandro.lab
	/**
653
	 * Update incremental.
654 36324 sandro.lab
	 *
655 34000 sandro.lab
	 * @param transactionId
656
	 *            the transaction id
657
	 * @param currentId
658
	 *            the current id
659
	 */
660 33589 sandro.lab
	private void updateIncremental(final String transactionId, final String currentId) {
661 41687 alessia.ba
		final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class);
662
		final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class);
663 47863 claudio.at
		final FindIterable<DBObject> it = transaction.find().noCursorTimeout(true);
664
		for (DBObject currentObj : it) {
665 33792 claudio.at
			final String id = (String) currentObj.get("id");
666 55359 alessia.ba
			BasicDBObject newObj = (BasicDBObject)((BasicDBObject) currentObj).copy();
667
			//Must not have _id in the new object
668
			newObj.remove("_id");
669 55330 alessia.ba
			//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
670
			//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
671
			final MongoCollection<DBObject> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
672 55334 alessia.ba
			mdstoreWrite.replaceOne(new BasicDBObject("id", id), newObj, new UpdateOptions().upsert(true));
673 33589 sandro.lab
		}
674
	}
675 28153 sandro.lab
676 34000 sandro.lab
	/*
677
	 * (non-Javadoc)
678 36324 sandro.lab
	 *
679 34000 sandro.lab
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
680
	 */
681 33589 sandro.lab
	@Override
682
	public Boolean dropTransaction(final String mdId, final String idToDrop) throws MDStoreServiceException {
683
		verifyConsistency();
684 41551 alessia.ba
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
685
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
686
		return dropStore(mdstoreInfo, idToDrop, "transactions");
687 33589 sandro.lab
	}
688 34000 sandro.lab
689 34145 sandro.lab
	public void garbageTransactionsOnStart() throws MDStoreServiceException {
690
		verifyConsistency();
691 41551 alessia.ba
		FindIterable<DBObject> it = this.managerTable.find();
692 42497 sandro.lab
693
		final List<String> currentMdStoreId = Lists.newArrayList();
694 41551 alessia.ba
		for (DBObject currentObject : it){
695 42497 sandro.lab
			currentMdStoreId.add((String) currentObject.get("currentId"));
696 34145 sandro.lab
			final BasicDBList transactions = (BasicDBList) currentObject.get("transactions");
697 34605 alessia.ba
			if ((transactions != null) && (transactions.size() > 0)) {
698 50676 claudio.at
				for (Object tx : transactions) {
699
					final DBObject currentTransactions = (DBObject) tx;
700 34145 sandro.lab
					final String id = (String) currentTransactions.get("id");
701
					db.getCollection(id).drop();
702
					db.getCollection("discarded-" + id).drop();
703
					log.debug("deleted collection " + id);
704
				}
705 41551 alessia.ba
				currentObject.put("transactions", new BasicDBList());
706
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
707 34145 sandro.lab
			}
708
		}
709 42497 sandro.lab
710
		//DELETING ALL THE DISCARDED COLLECTION THAT DISCARDED COLLECTION OF THE CURRENT MDSTORE
711
		final ArrayList<String> collectionsNames = Lists.newArrayList(db.listCollectionNames());
712
713
		for (String item : collectionsNames) {
714
			if (item.startsWith("discarded-")) {
715
				final String currentCollection = StringUtils.substringAfter(item, "discarded-");
716
				if (!currentMdStoreId.contains(currentCollection)) {
717 42498 sandro.lab
					log.info("Deleting discarded collection :" + item);
718 42497 sandro.lab
					this.db.getCollection(item).drop();
719
				}
720
			}
721
		}
722 34145 sandro.lab
	}
723
724 34000 sandro.lab
	/**
725
	 * Gets the max transactions.
726 36324 sandro.lab
	 *
727 34000 sandro.lab
	 * @return the maxTransactions
728
	 */
729
	public int getMaxTransactions() {
730
		return maxTransactions;
731
	}
732
733
	/**
734
	 * Sets the max transactions.
735 36324 sandro.lab
	 *
736 34000 sandro.lab
	 * @param maxTransactions
737
	 *            the maxTransactions to set
738
	 */
739
	public void setMaxTransactions(final int maxTransactions) {
740
		this.maxTransactions = maxTransactions;
741
	}
742 28055 sandro.lab
}