Project

General

Profile

1
package eu.dnetlib.msro.workers.aggregation.mdstore.mongo;
2

    
3
import java.time.Duration;
4
import java.time.LocalDate;
5
import java.time.ZoneId;
6
import java.util.Date;
7
import java.util.List;
8
import java.util.Optional;
9
import java.util.TreeSet;
10
import java.util.stream.Collectors;
11
import javax.annotation.PostConstruct;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Sets;
15
import com.mongodb.BasicDBObject;
16
import com.mongodb.WriteConcern;
17
import com.mongodb.client.MongoCollection;
18
import com.mongodb.client.MongoDatabase;
19
import com.mongodb.client.model.Filters;
20
import com.mongodb.client.model.UpdateOptions;
21
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
22
import eu.dnetlib.msro.workers.aggregation.mdstore.*;
23
import eu.dnetlib.msro.workers.aggregation.mdstore.model.*;
24
import org.apache.commons.lang3.StringUtils;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.stereotype.Component;
29

    
30
import static eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreConstants.*;
31

    
32
/**
33
 * Created by claudio on 13/03/2017.
34
 */
35
@Component
36
public class MDStoreTransactionManagerImpl implements MDStoreTransactionManager {
37

    
38
	/**
39
	 * The Constant log.
40
	 */
41
	private static final Log log = LogFactory.getLog(MDStoreTransactionManagerImpl.class);
42

    
43
	/**
44
	 * The max number of concurrent transactions per mdstore.
45
	 */
46
	private int maxTransactions = 1;
47

    
48
	/**
49
	 * The expired days.
50
	 */
51
	private int expiredDays = 3;
52

    
53
	/**
54
	 * The db.
55
	 */
56
	@Autowired
57
	private MongoDatabase db;
58

    
59
	/**
60
	 * The manager table.
61
	 */
62
	private MongoCollection<MDStoreManagerInfo> managerTable;
63

    
64
	/**
65
	 * Bootstrap manager.
66
	 */
67
	@PostConstruct
68
	private void bootstrapManager() {
69
		log.debug("Bootstrap Manager start");
70
		final MongoCollection<MDStoreDescription> metadataColl = db.getCollection(METADATA_NAME, MDStoreDescription.class);
71
		managerTable = db.getCollection(METADATA_MANAGER_TABLE, MDStoreManagerInfo.class);
72
		metadataColl.find().iterator().forEachRemaining(desc -> {
73
			getManagerTable().insertOne(asManagerInfo(desc.getMdId()));
74
			log.debug(String.format("Added %s to Metadata Manager data structure", desc.getMdId()));
75
		});
76

    
77
		log.debug("Create index in MetadaManager");
78
		getManagerTable().createIndex(new BasicDBObject(MDStoreConstants.MDID, 1));
79
	}
80

    
81
	private MDStoreManagerInfo asManagerInfo(final String mdId) {
82
		return MDStoreManagerInfo.create()
83
				.setMdId(mdId)
84
				.setCurrentId(createInternalId(mdId));
85
	}
86

    
87
	/**
88
	 * {@inheritDoc}
89
	 */
90
	@Override
91
	public void verifyConsistency() throws MDStoreException {
92
		if (getManagerTable() == null) {
93
			final int size = DnetStreamSupport.generateStreamFromIterator(
94
					db.listCollectionNames().iterator())
95
					.filter(it -> it.contains(METADATA_MANAGER_TABLE))
96
					.collect(Collectors.toList())
97
					.size();
98
			if (size == 0) {
99
				bootstrapManager();
100
			} else {
101
				managerTable = db.getCollection(METADATA_MANAGER_TABLE, MDStoreManagerInfo.class);
102
			}
103
		}
104
		if (getManagerTable() == null) {
105
			throw new MDStoreException("Something bad happen, unable to create managerTable");
106
		}
107
	}
108

    
109
	/**
110
	 * {@inheritDoc}
111
	 *
112
	 * @see MDStoreTransactionManager#createMDStore(String)
113
	 */
114
	@Override
115
	public String createMDStore(final String mdId) throws MDStoreException {
116
		log.debug("Creating new mdstore");
117
		verifyConsistency();
118
		final MDStoreManagerInfo managerInfo = asManagerInfo(mdId);
119
		getManagerTable().insertOne(managerInfo);
120
		db.createCollection(managerInfo.getCurrentId());
121
		return managerInfo.getCurrentId();
122
	}
123

    
124
	private String createInternalId(final String mdId) {
125
		String newId = mdId;
126
		if (mdId.contains("_")) {
127
			newId = StringUtils.substringBefore(mdId, "_");
128
		}
129
		return newId;
130
	}
131

    
132
	/**
133
	 * {@inheritDoc}
134
	 *
135
	 * @see MDStoreTransactionManager#dropMDStore(String)
136
	 */
137
	@Override
138
	public void dropMDStore(final String mdId) throws MDStoreException {
139
		verifyConsistency();
140
		log.debug("Dropping MDStore: " + mdId);
141

    
142
		final MDStoreManagerInfo dropped = getManagerTable().findOneAndDelete(Filters.eq(MDID, mdId));
143
		garbage();
144
		doDrop(dropped.getCurrentId());
145
	}
146

    
147
	/**
148
	 * {@inheritDoc}
149
	 *
150
	 * @see MDStoreTransactionManager#getMDStoreCollection(String)
151
	 */
152
	@Override
153
	public String getMDStoreCollection(final String mdId) throws MDStoreException {
154
		verifyConsistency();
155
		return getInfoForCurrentMdStore(mdId).getCurrentId();
156
	}
157

    
158
	/**
159
	 * {@inheritDoc}
160
	 *
161
	 * @see MDStoreTransactionManager#startTransaction(String, boolean)
162
	 */
163
	@Override
164
	public String startTransaction(final String mdId, final boolean refresh) throws MDStoreException {
165
		verifyConsistency();
166
		log.info("Start transaction for metadata store " + mdId);
167
		final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
168
		final String transactionId = StringUtils.substringBefore(mdId, "_") + "::" + System.currentTimeMillis();
169

    
170
		if (info.getTransactions().size() > getMaxTransactions()) {
171
			throw new MDStoreException(
172
					String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), info.getTransactions().size(), mdId));
173
		}
174

    
175
		info.getTransactions().add(
176
				MDStoreTransactionInfo.create()
177
				.setId(transactionId)
178
				.setRefresh(refresh));
179

    
180
		getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
181
		return transactionId;
182
	}
183

    
184
	/**
185
	 * {@inheritDoc}
186
	 *
187
	 * @see MDStoreTransactionManager#commit(String, String, MDStore)
188
	 */
189
	@Override
190
	public boolean commit(final String transactionId, final String mdId, final MDStore current) throws MDStoreException {
191
		verifyConsistency();
192
		final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
193
		final List<MDStoreTransactionInfo> transactions = info.getTransactions();
194
		if (transactions.isEmpty()) {
195
			throw new MDStoreException("Error, unable to find transaction with Id " + transactionId);
196
		}
197
		final MDStoreTransactionInfo tx = findTransaction(transactions, transactionId);
198
		transactions.remove(tx);
199
		final String oldId = info.getCurrentId();
200
		if (tx.getRefresh()) {
201
			info.setCurrentId(transactionId);
202
			if (info.getExpiring().isEmpty()) {
203
				doDrop(oldId);
204
			}
205
			log.debug("Replaced collection ");
206
		} else {
207
			log.debug("commit incremental ");
208
			updateIncremental(transactionId, oldId);
209
			doDrop(transactionId);
210
		}
211
		getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
212

    
213
		log.info("Committed transaction for metadata store " + mdId);
214
		return true;
215
	}
216

    
217
	/**
218
	 * Find transaction.
219
	 *
220
	 * @param transactions  the transactions
221
	 * @param transactionId the transaction id
222
	 * @return the DB object
223
	 */
224
	private MDStoreTransactionInfo findTransaction(final List<MDStoreTransactionInfo> transactions, final String transactionId) throws MDStoreException {
225
		final Optional<MDStoreTransactionInfo> tx = transactions.stream()
226
				.filter(t -> transactionId.equals(t.getId())).findFirst();
227
		if (!tx.isPresent()) {
228
			throw new MDStoreException("Error, unable to find transaction with Id " + transactionId);
229
		} else {
230
			return tx.get();
231
		}
232
	}
233

    
234
	/**
235
	 * {@inheritDoc}
236
	 *
237
	 * @see MDStoreTransactionManager#readMdStore(String)
238
	 */
239
	@Override
240
	public String readMdStore(final String mdId) throws  MDStoreException {
241
		verifyConsistency();
242
		final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
243

    
244
		final String currentId = info.getCurrentId();
245
		final List<MDStoreExpiredInfo> expiring = info.getExpiring();
246
		updateMdstoreUsed(expiring, currentId);
247
		getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
248
		return currentId;
249
	}
250

    
251
	/**
252
	 * Update mdstore used.
253
	 *
254
	 * @param expiring the values to be updated
255
	 * @param mdId the mdstore id
256
	 */
257
	private void updateMdstoreUsed(final List<MDStoreExpiredInfo> expiring, final String mdId) {
258
		final Optional<MDStoreExpiredInfo> expiringInfo = expiring.stream()
259
				.filter(info -> mdId.equals(info.getId()))
260
				.findFirst();
261

    
262
		if (expiringInfo.isPresent()) {
263
			expiringInfo.get().setLastRead(new Date());
264
		} else {
265
			expiring.add(MDStoreExpiredInfo.create().setId(mdId));
266
		}
267
	}
268

    
269
	/**
270
	 * Gets the manager table.
271
	 *
272
	 * @return the managerTable
273
	 */
274
	private MongoCollection<MDStoreManagerInfo> getManagerTable() {
275
		return managerTable;
276
	}
277

    
278
	/**
279
	 * (non-Javadoc)
280
	 *
281
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
282
	 */
283
	@Override
284
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdId) throws  MDStoreException {
285
		verifyConsistency();
286
		final MDStoreManagerInfo managerInfo = getManagerTable().find(Filters.eq(MDID, mdId)).first();
287
		if (managerInfo == null) {
288
			throw new  MDStoreException("Error, unable to find MDStore " + mdId);
289
		}
290
		return managerInfo;
291
	}
292

    
293
	/**
294
	 * (non-Javadoc)
295
	 *
296
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
297
	 */
298
	@Override
299
	public Boolean dropUsed(final String mdId, final String idToDrop) throws  MDStoreException {
300
		verifyConsistency();
301
		final MDStoreManagerInfo managerInfo = getInfoForCurrentMdStore(mdId);
302

    
303
		final Optional<MDStoreExpiredInfo> info = managerInfo.getExpiring().stream()
304
				.filter(i -> idToDrop.equals(i.getId()))
305
				.findFirst();
306

    
307
		if (info.isPresent()) {
308
			doDrop(idToDrop);
309
			managerInfo.getExpiring().remove(info.get());
310

    
311
			getManagerTable().findOneAndReplace(Filters.eq(MDID, managerInfo.getMdId()), managerInfo);
312
			return true;
313
		} else {
314
			throw new MDStoreException("Error, unable to drop collection " + idToDrop);
315
		}
316
	}
317

    
318
	private void doDrop(final String mdId) {
319
		db.getCollection(mdId).drop();
320
		db.getCollection(DISCARDED_PREFIX + mdId).drop();
321
		log.debug("deleted collection " + mdId);
322
	}
323

    
324
	/**
325
	 * (non-Javadoc)
326
	 *
327
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#garbage()
328
	 */
329
	@Override
330
	public void garbage() throws  MDStoreException {
331
		verifyConsistency();
332
		log.info("Start garbage collection of MdStore");
333
		int totalDeleted = 0;
334
		for (MDStoreManagerInfo currentInfo : getManagerTable().find()) {
335
			if (log.isDebugEnabled()) {
336
				log.debug("start to check id: " + currentInfo.getCurrentId());
337
			}
338
			garbageExpiring(currentInfo);
339
			garbageTransactions(currentInfo);
340
			getManagerTable().findOneAndReplace(Filters.eq(MDID, currentInfo.getMdId()), currentInfo);
341
		}
342

    
343
		// DELETING Collection that are not in the metadataManager table
344

    
345
		for (String coll : db.listCollectionNames()) {
346
			if ((coll.length() > 30) && (!coll.contains(DISCARDED_PREFIX)) && (!coll.contains("resolved"))) {
347
				final MDStoreManagerInfo info = getMetadataObjectForCollections(coll);
348

    
349
				if (shouldDelete(coll, info)) {
350
					if (log.isDebugEnabled()) {
351
						log.debug("delete collection: " + coll + " from mongo");
352
					}
353
					doDrop(coll);
354
					if (log.isDebugEnabled()) {
355
						log.debug("delete collection: discarded-" + coll + " from mongo");
356
					}
357
				}
358
			}
359
		}
360

    
361
		log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted);
362
	}
363

    
364
	private boolean shouldDelete(final String collection, final MDStoreManagerInfo managerInfo) {
365

    
366
		if (StringUtils.isBlank(managerInfo.getCurrentId())) {
367
			log.debug("the instance has not currentID");
368
			return true;
369
		}
370
		if (collection.equals(managerInfo.getCurrentId())) {
371
			return false;
372
		}
373
		if (managerInfo.getExpiring().stream().anyMatch(i -> collection.equals(i.getId()))) {
374
			return false;
375
		}
376
		return managerInfo.getTransactions().stream().anyMatch(i -> collection.equals(i.getId()));
377
	}
378

    
379
	private MDStoreManagerInfo getMetadataObjectForCollections(final String collection) throws MDStoreException {
380
		if (collection == null) {
381
			throw new MDStoreException("missing collection name");
382
		}
383
		final String tmp = collection.contains(
384
				DISCARDED_PREFIX) == true ? StringUtils.substringAfter(collection, DISCARDED_PREFIX) : collection;
385
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + MDSTORE_ID_SUFFIX;
386

    
387
		return getManagerTable().find(Filters.eq(MDID, collectionNameCleaned)).first();
388
	}
389

    
390
	/**
391
	 * Garbage transactions.
392
	 *
393
	 * @param currentInfo the current object
394
	 */
395
	private void garbageTransactions(final MDStoreManagerInfo currentInfo) {
396
		if (log.isDebugEnabled()) {
397
			log.debug("Start garbage transactions ");
398
		}
399

    
400
		if (currentInfo.getTransactions().size() <= getMaxTransactions()) {
401
			return;
402
		}
403

    
404
		final TreeSet<MDStoreTransactionInfo> expiringSet = Sets.newTreeSet(MDStoreUtils.getComparatorOnDate());
405
		expiringSet.addAll(currentInfo.getTransactions());
406

    
407
		int toRemove = 0;
408

    
409
		for(MDStoreTransactionInfo i : expiringSet) {
410
			if (currentInfo.getTransactions().size() - toRemove >= getMaxTransactions()) {
411
				return;
412
			}
413
			if (!i.getId().equals(currentInfo.getCurrentId())) {
414
				if (log.isDebugEnabled()) {
415
					log.debug("delete collection: " + i.getId() + " from mongo");
416
				}
417
				toRemove++;
418
				doDrop(i.getId());
419
			} else {
420
				log.debug("Cannot remove transaction " + i.getId() + " because is the currentId: " + currentInfo.getCurrentId());
421
			}
422
		}
423
		log.info("Deleted " + toRemove + " transactions, mdStore Id:" + currentInfo.getMdId());
424
	}
425

    
426
	/**
427
	 * Garbage expiring.
428
	 *
429
	 * @param currentInfo the current object
430
	 */
431
	private void garbageExpiring(final MDStoreManagerInfo currentInfo) {
432
		if (log.isDebugEnabled()) {
433
			log.debug("Start to search expiring mdstores for id: " + currentInfo.getMdId());
434
		}
435
		final List<MDStoreExpiredInfo> expiring = currentInfo.getExpiring();
436
		expiring.forEach(i -> {
437

    
438
			final LocalDate readDate = i.getLastRead().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
439
			final long d = Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays();
440
			if (d > getExpiredDays()) {
441
				if (!i.getId().equals(currentInfo.getCurrentId())) {
442
					doDrop(i.getId());
443
				}
444
			}
445
		});
446

    
447
		log.debug("Deleted expired collections, mdId: " + currentInfo.getMdId());
448
	}
449

    
450
	/**
451
	 * Update incremental.
452
	 *
453
	 * @param transactionId the transaction id
454
	 * @param currentId     the current id
455
	 */
456
	private void updateIncremental(final String transactionId, final String currentId) {
457
		final MongoCollection<MDStoreRecord> transaction = db.getCollection(transactionId, MDStoreRecord.class);
458
		final MongoCollection<MDStoreRecord> mdstore = db.getCollection(currentId, MDStoreRecord.class);
459

    
460
		for (MDStoreRecord record : transaction.find().noCursorTimeout(true)) {
461

    
462
			final MDStoreRecord r = record.clone();
463
			if (StringUtils.isNotBlank(r.getId())) {
464
				//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
465
				//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
466
				final MongoCollection<MDStoreRecord> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
467
				mdstoreWrite.replaceOne(Filters.eq(ID, r.getId()), r, new UpdateOptions().upsert(true));
468
			}
469
		}
470
	}
471

    
472
	/**
473
	 * (non-Javadoc)
474
	 *
475
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
476
	 */
477
	@Override
478
	public Boolean dropTransaction(final String mdId, final String idToDrop) throws  MDStoreException {
479
		verifyConsistency();
480
		final MDStoreManagerInfo managerInfo = getInfoForCurrentMdStore(mdId);
481
		managerInfo.getTransactions().removeIf(info -> {
482
			if (idToDrop.equals(info.getId())) {
483
				doDrop(idToDrop);
484
				return true;
485
			} else {
486
				return false;
487
			}
488
		});
489
		getManagerTable().findOneAndReplace(Filters.eq(MDID, managerInfo.getMdId()), managerInfo);
490
		return true;
491
	}
492

    
493
	public void garbageTransactionsOnStart() throws  MDStoreException {
494
		verifyConsistency();
495
		for (MDStoreManagerInfo info : getManagerTable().find()) {
496
			info.getTransactions().forEach(tx -> doDrop(tx.getId()));
497
			info.setTransactions(Lists.newArrayList());
498
			getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
499
		}
500
	}
501

    
502
	/**
503
	 * Gets the max transactions.
504
	 *
505
	 * @return the maxTransactions
506
	 */
507
	public int getMaxTransactions() {
508
		return maxTransactions;
509
	}
510

    
511
	/**
512
	 * Sets the max transactions.
513
	 *
514
	 * @param maxTransactions the maxTransactions to set
515
	 */
516
	public void setMaxTransactions(final int maxTransactions) {
517
		this.maxTransactions = maxTransactions;
518
	}
519

    
520
	/**
521
	 * Gets the expired days.
522
	 *
523
	 * @return the expiredDays
524
	 */
525
	public int getExpiredDays() {
526
		return expiredDays;
527
	}
528

    
529
	/**
530
	 * Sets the expired days.
531
	 *
532
	 * @param expiredDays the expiredDays to set
533
	 */
534
	public void setExpiredDays(final int expiredDays) {
535
		this.expiredDays = expiredDays;
536
	}
537
}
(2-2/6)