Project

General

Profile

« Previous | Next » 

Revision 46417

migrated mdstore implementation, early tests

View differences:

MDStoreTransactionManagerImpl.java
3 3
import java.time.Duration;
4 4
import java.time.LocalDate;
5 5
import java.time.ZoneId;
6
import java.util.Collections;
7 6
import java.util.Date;
8 7
import java.util.List;
8
import java.util.Optional;
9
import java.util.TreeSet;
9 10
import java.util.stream.Collectors;
11
import javax.annotation.PostConstruct;
10 12

  
11 13
import com.google.common.collect.Lists;
12
import com.mongodb.*;
13
import com.mongodb.client.FindIterable;
14
import com.google.common.collect.Sets;
15
import com.mongodb.BasicDBObject;
16
import com.mongodb.WriteConcern;
14 17
import com.mongodb.client.MongoCollection;
15 18
import com.mongodb.client.MongoDatabase;
16
import com.mongodb.client.MongoIterable;
17 19
import com.mongodb.client.model.Filters;
18 20
import com.mongodb.client.model.UpdateOptions;
19 21
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
20 22
import eu.dnetlib.msro.workers.aggregation.mdstore.*;
21
import eu.dnetlib.msro.workers.aggregation.mdstore.model.MDStoreExpiredInfo;
22
import eu.dnetlib.msro.workers.aggregation.mdstore.model.MDStoreManagerInfo;
23
import eu.dnetlib.msro.workers.aggregation.mdstore.model.MDStoreTransactionInfo;
23
import eu.dnetlib.msro.workers.aggregation.mdstore.model.*;
24 24
import org.apache.commons.lang3.StringUtils;
25 25
import org.apache.commons.logging.Log;
26 26
import org.apache.commons.logging.LogFactory;
27
import org.bson.conversions.Bson;
28 27
import org.springframework.beans.factory.annotation.Autowired;
29 28
import org.springframework.stereotype.Component;
30 29

  
30
import static eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreConstants.*;
31

  
31 32
/**
32 33
 * Created by claudio on 13/03/2017.
33 34
 */
......
45 46
	private int maxTransactions = 1;
46 47

  
47 48
	/**
49
	 * The expired days.
50
	 */
51
	private int expiredDays = 3;
52

  
53
	/**
48 54
	 * The db.
49 55
	 */
50 56
	@Autowired
......
53 59
	/**
54 60
	 * The manager table.
55 61
	 */
56
	private MongoCollection<DBObject> managerTable;
62
	private MongoCollection<MDStoreManagerInfo> managerTable;
57 63

  
58 64
	/**
59
	 * The expired days.
60
	 */
61
	private int expiredDays;
62

  
63
	/**
64 65
	 * Bootstrap manager.
65 66
	 */
67
	@PostConstruct
66 68
	private void bootstrapManager() {
67 69
		log.debug("Bootstrap Manager start");
68
		final MongoCollection<DBObject> metadataColl = db.getCollection(MDStoreConstants.METADATA_NAME, DBObject.class);
69
		final FindIterable<DBObject> values = metadataColl.find();
70
		this.setManagerTable(db.getCollection(MDStoreConstants.METADATA_MANAGER_TABLE, DBObject.class));
71
		for (DBObject object : values) {
72
			final String id = (String) object.get(MDStoreConstants.MDID);
73
			String newId = id;
74
			if (id.contains("_")) {
75
				newId = StringUtils.substringBefore(id, "_");
76
			}
77
			final DBObject input = BasicDBObjectBuilder.start()
78
				.add(MDStoreConstants.MDID, id)
79
				.add(MDStoreConstants.CURRENT_ID, newId)
80
				.add(MDStoreConstants.EXPIRING, new String[] {})
81
				.add(MDStoreConstants.TRANSACTIONS, new String[] {})
82
				.get();
83
			getManagerTable().insertOne(input);
84
			log.debug(String.format("Added %s to Metadata Manager data structure", id));
70
		final MongoCollection<MDStoreDescription> metadataColl = db.getCollection(METADATA_NAME, MDStoreDescription.class);
71
		managerTable = db.getCollection(METADATA_MANAGER_TABLE, MDStoreManagerInfo.class);
72
		metadataColl.find().iterator().forEachRemaining(desc -> {
73
			getManagerTable().insertOne(asManagerInfo(desc.getMdId()));
74
			log.debug(String.format("Added %s to Metadata Manager data structure", desc.getMdId()));
75
		});
85 76

  
86
		}
87
		final BasicDBObject ensureIndex = new BasicDBObject();
88
		ensureIndex.put(MDStoreConstants.MDID, 1);
89
		log.debug("Create index in MetadaManager ");
90
		this.getManagerTable().createIndex(ensureIndex);
77
		log.debug("Create index in MetadaManager");
78
		getManagerTable().createIndex(new BasicDBObject(MDStoreConstants.MDID, 1));
91 79
	}
92 80

  
93
	/**
94
	 * Gets the DBObject describing an mdstore. null if there is no mdstore with the given id.
95
	 *
96
	 * @param mdstoreID
97
	 * @return DBObject or null
98
	 */
99
	private DBObject getMDStoreAsDBObject(String mdstoreID) {
100
		final BasicDBObject query = new BasicDBObject();
101
		query.put(MDStoreConstants.MDID, mdstoreID);
102
		return getManagerTable().find(query).first();
81
	private MDStoreManagerInfo asManagerInfo(final String mdId) {
82
		return MDStoreManagerInfo.create()
83
				.setMdId(mdId)
84
				.setCurrentId(createInternalId(mdId));
103 85
	}
104 86

  
105 87
	/**
......
107 89
	 */
108 90
	@Override
109 91
	public void verifyConsistency() throws MDStoreException {
110
		if (this.getManagerTable() == null) {
92
		if (getManagerTable() == null) {
111 93
			final int size = DnetStreamSupport.generateStreamFromIterator(
112 94
					db.listCollectionNames().iterator())
113
					.filter(it -> it.contains(MDStoreConstants.METADATA_MANAGER_TABLE))
95
					.filter(it -> it.contains(METADATA_MANAGER_TABLE))
114 96
					.collect(Collectors.toList())
115 97
					.size();
116

  
117
			if (size == 0)
98
			if (size == 0) {
118 99
				bootstrapManager();
119
			else {
120
				this.setManagerTable(db.getCollection(MDStoreConstants.METADATA_MANAGER_TABLE, DBObject.class));
100
			} else {
101
				managerTable = db.getCollection(METADATA_MANAGER_TABLE, MDStoreManagerInfo.class);
121 102
			}
122 103
		}
123
		if (this.getManagerTable() == null) throw new MDStoreException("Something bad happen, unable to create managerTable");
104
		if (getManagerTable() == null) {
105
			throw new MDStoreException("Something bad happen, unable to create managerTable");
106
		}
124 107
	}
125 108

  
126 109
	/**
......
129 112
	 * @see MDStoreTransactionManager#createMDStore(String)
130 113
	 */
131 114
	@Override
132
	public void createMDStore(final String mdId) throws MDStoreException {
115
	public String createMDStore(final String mdId) throws MDStoreException {
133 116
		log.debug("Creating new mdstore");
134 117
		verifyConsistency();
118
		final MDStoreManagerInfo managerInfo = asManagerInfo(mdId);
119
		getManagerTable().insertOne(managerInfo);
120
		db.createCollection(managerInfo.getCurrentId());
121
		return managerInfo.getCurrentId();
122
	}
123

  
124
	private String createInternalId(final String mdId) {
135 125
		String newId = mdId;
136 126
		if (mdId.contains("_")) {
137 127
			newId = StringUtils.substringBefore(mdId, "_");
138 128
		}
139
		final BasicDBObject instance = new BasicDBObject();
140
		instance.put(MDStoreConstants.MDID, mdId);
141
		instance.put(MDStoreConstants.CURRENT_ID, newId);
142
		instance.put(MDStoreConstants.EXPIRING, new String[] {});
143
		getManagerTable().insertOne(instance);
129
		return newId;
144 130
	}
145 131

  
146 132
	/**
......
151 137
	@Override
152 138
	public void dropMDStore(final String mdId) throws MDStoreException {
153 139
		verifyConsistency();
154
		log.debug("Droping MDStore: " + mdId);
155
		final BasicDBObject query = new BasicDBObject();
156
		query.put(MDStoreConstants.MDID, mdId);
157
		final DBObject dropped = this.getManagerTable().findOneAndDelete(query);
140
		log.debug("Dropping MDStore: " + mdId);
141

  
142
		final MDStoreManagerInfo dropped = getManagerTable().findOneAndDelete(Filters.eq(MDID, mdId));
158 143
		garbage();
159
		final String collectionName = (String) dropped.get(MDStoreConstants.CURRENT_ID);
160
		this.db.getCollection(collectionName).drop();
161
		this.db.getCollection(MDStoreConstants.DISCARDED_PREFIX + collectionName).drop();
144
		doDrop(dropped.getCurrentId());
162 145
	}
163 146

  
164 147
	/**
......
169 152
	@Override
170 153
	public String getMDStoreCollection(final String mdId) throws MDStoreException {
171 154
		verifyConsistency();
172
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
173
		if (mdstoreInfo != null) {
174
			return (String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID);
175
		} else {
176
			throw new MDStoreException("unable to find MDStore " + mdId);
177
		}
155
		return getInfoForCurrentMdStore(mdId).getCurrentId();
178 156
	}
179 157

  
180 158
	/**
......
186 164
	public String startTransaction(final String mdId, final boolean refresh) throws MDStoreException {
187 165
		verifyConsistency();
188 166
		log.info("Start transaction for metadata store " + mdId);
189
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
190
		if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdId);
191
		String idCreation = StringUtils.substringBefore(mdId, "_");
192
		idCreation = idCreation + "::" + System.currentTimeMillis();
167
		final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
168
		final String transactionId = StringUtils.substringBefore(mdId, "_") + "::" + System.currentTimeMillis();
193 169

  
194
		BasicDBList values;
195
		if (mdstoreInfo.containsField(MDStoreConstants.TRANSACTIONS)) {
196
			values = (BasicDBList) mdstoreInfo.get(MDStoreConstants.TRANSACTIONS);
197
			if (values.size() > getMaxTransactions())
198
				throw new MDStoreException(String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), values.size(), mdId));
199
		} else {
200
			values = new BasicDBList();
170
		if (info.getTransactions().size() > getMaxTransactions()) {
171
			throw new MDStoreException(
172
					String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), info.getTransactions().size(), mdId));
201 173
		}
202
		final BasicDBObject transactionMetadata = new BasicDBObject();
203
		transactionMetadata.put(MDStoreConstants.ID, idCreation.toString());
204
		transactionMetadata.put(MDStoreConstants.REFRESH, refresh);
205
		transactionMetadata.put(MDStoreConstants.DATE, new Date());
206
		values.add(transactionMetadata);
207
		mdstoreInfo.put(MDStoreConstants.TRANSACTIONS, values);
208
		getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
209
		return idCreation.toString();
174

  
175
		info.getTransactions().add(
176
				MDStoreTransactionInfo.create()
177
				.setId(transactionId)
178
				.setRefresh(refresh));
179

  
180
		getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
181
		return transactionId;
210 182
	}
211 183

  
212 184
	/**
213 185
	 * {@inheritDoc}
214 186
	 *
215
	 * @see MDStoreTransactionManager#commit(String, String,
216
	 * MDStore)
187
	 * @see MDStoreTransactionManager#commit(String, String, MDStore)
217 188
	 */
218 189
	@Override
219
	public boolean commit(final String transactionId, final String mdstoreId, final MDStore current) throws MDStoreException {
190
	public boolean commit(final String transactionId, final String mdId, final MDStore current) throws MDStoreException {
220 191
		verifyConsistency();
221
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdstoreId);
222
		if (mdstoreInfo == null) {
223
			throw new MDStoreException("Error, unable to find Mdstore with Id " + mdstoreId);
224
		}
225
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get(MDStoreConstants.TRANSACTIONS);
192
		final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
193
		final List<MDStoreTransactionInfo> transactions = info.getTransactions();
226 194
		if (transactions.isEmpty()) {
227 195
			throw new MDStoreException("Error, unable to find transaction with Id " + transactionId);
228 196
		}
229
		final DBObject transaction = findTransaction(transactions, transactionId);
230
		final boolean refresh = (Boolean) transaction.get(MDStoreConstants.REFRESH);
231
		transactions.remove(transaction);
232
		final String oldId = (String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID);
233
		if (refresh) {
234
			mdstoreInfo.put(MDStoreConstants.CURRENT_ID, transactionId);
235
			final BasicDBList stillUsed = (BasicDBList) mdstoreInfo.get(MDStoreConstants.EXPIRING);
236
			if (stillUsed.size() == 0) {
237
				db.getCollection(oldId).drop();
238
				db.getCollection(MDStoreConstants.DISCARDED_PREFIX + oldId).drop();
197
		final MDStoreTransactionInfo tx = findTransaction(transactions, transactionId);
198
		transactions.remove(tx);
199
		final String oldId = info.getCurrentId();
200
		if (tx.getRefresh()) {
201
			info.setCurrentId(transactionId);
202
			if (info.getExpiring().isEmpty()) {
203
				doDrop(oldId);
239 204
			}
240 205
			log.debug("Replaced collection ");
241 206
		} else {
242 207
			log.debug("commit incremental ");
243 208
			updateIncremental(transactionId, oldId);
244
			db.getCollection(transactionId).drop();
245
			db.getCollection(MDStoreConstants.DISCARDED_PREFIX + transactionId).drop();
209
			doDrop(transactionId);
246 210
		}
247
		this.getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
211
		getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
248 212

  
249
		log.info("Committed transaction for metadata store " + mdstoreId);
213
		log.info("Committed transaction for metadata store " + mdId);
250 214
		return true;
251 215
	}
252 216

  
......
257 221
	 * @param transactionId the transaction id
258 222
	 * @return the DB object
259 223
	 */
260
	private DBObject findTransaction(final BasicDBList transactions, final String transactionId) throws MDStoreException {
261
		if (transactions.size() == 0) return null;
262
		for (int i = 0; i < transactions.size(); i++) {
263
			final BasicDBObject transaction = (BasicDBObject) transactions.get(i);
264
			if (transactionId.equals(transaction.getString(MDStoreConstants.ID))) {
265
				return transaction;
266
			}
224
	private MDStoreTransactionInfo findTransaction(final List<MDStoreTransactionInfo> transactions, final String transactionId) throws MDStoreException {
225
		final Optional<MDStoreTransactionInfo> tx = transactions.stream()
226
				.filter(t -> transactionId.equals(t.getId())).findFirst();
227
		if (!tx.isPresent()) {
228
			throw new MDStoreException("Error, unable to find transaction with Id " + transactionId);
229
		} else {
230
			return tx.get();
267 231
		}
268
		throw new MDStoreException("Error, unable to find transaction with Id " + transactionId);
269 232
	}
270 233

  
271 234
	/**
......
274 237
	 * @see MDStoreTransactionManager#readMdStore(String)
275 238
	 */
276 239
	@Override
277
	public String readMdStore(final String mdStoreId) throws  MDStoreException {
240
	public String readMdStore(final String mdId) throws  MDStoreException {
278 241
		verifyConsistency();
279
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
280
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdStoreId);
281
		final String currentId = (String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID);
282
		final BasicDBList values = (BasicDBList) mdstoreInfo.get(MDStoreConstants.EXPIRING);
283
		updateMdstoreUsed(values, currentId);
284
		this.getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
242
		final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
243

  
244
		final String currentId = info.getCurrentId();
245
		final List<MDStoreExpiredInfo> expiring = info.getExpiring();
246
		updateMdstoreUsed(expiring, currentId);
247
		getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
285 248
		return currentId;
286 249
	}
287 250

  
288 251
	/**
289 252
	 * Update mdstore used.
290 253
	 *
291
	 * @param values the values
292
	 * @param mdId   the md id
254
	 * @param expiring the values to be updated
255
	 * @param mdId the mdstore id
293 256
	 */
294
	private void updateMdstoreUsed(final BasicDBList values, final String mdId) {
295
		if (values.size() > 0) {
296
			for (int i = 0; i < values.size(); i++) {
297
				final DBObject obj = (DBObject) values.get(i);
298
				final String id = (String) obj.get(MDStoreConstants.ID);
299
				if (mdId.equals(id)) {
300
					obj.put(MDStoreConstants.LAST_READ, new Date());
301
					return;
302
				}
303
			}
257
	private void updateMdstoreUsed(final List<MDStoreExpiredInfo> expiring, final String mdId) {
258
		final Optional<MDStoreExpiredInfo> expiringInfo = expiring.stream()
259
				.filter(info -> mdId.equals(info.getId()))
260
				.findFirst();
261

  
262
		if (expiringInfo.isPresent()) {
263
			expiringInfo.get().setLastRead(new Date());
264
		} else {
265
			expiring.add(MDStoreExpiredInfo.create().setId(mdId));
304 266
		}
305
		final BasicDBObject readStore = new BasicDBObject();
306
		readStore.put(MDStoreConstants.ID, mdId);
307
		readStore.put(MDStoreConstants.LAST_READ, new Date());
308
		values.add(readStore);
309 267
	}
310 268

  
311 269
	/**
......
313 271
	 *
314 272
	 * @return the managerTable
315 273
	 */
316
	public MongoCollection<DBObject> getManagerTable() {
274
	private MongoCollection<MDStoreManagerInfo> getManagerTable() {
317 275
		return managerTable;
318 276
	}
319 277

  
320 278
	/**
321
	 * Sets the manager table.
322
	 *
323
	 * @param managerTable the managerTable to set
324
	 */
325
	public void setManagerTable(final MongoCollection<DBObject> managerTable) {
326
		this.managerTable = managerTable;
327
	}
328

  
329
	/*
330 279
	 * (non-Javadoc)
331 280
	 *
332
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
281
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
333 282
	 */
334 283
	@Override
335
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdStoreId) throws  MDStoreException {
284
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdId) throws  MDStoreException {
336 285
		verifyConsistency();
337
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
338
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdStoreId);
339
		final MDStoreManagerInfo result = new MDStoreManagerInfo();
340
		result.setCurrentId((String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID));
341
		result.setMdId((String) mdstoreInfo.get(MDStoreConstants.MDID));
342
		final BasicDBList values = (BasicDBList) mdstoreInfo.get(MDStoreConstants.EXPIRING);
343
		for (int i = 0; i < values.size(); i++) {
344
			final MDStoreExpiredInfo stillused = new MDStoreExpiredInfo();
345
			final DBObject value = (DBObject) values.get(i);
346
			stillused.setId((String) value.get(MDStoreConstants.ID));
347
			stillused.setLastRead((Date) value.get(MDStoreConstants.LAST_READ));
348
			result.addExpiredItem(stillused);
286
		final MDStoreManagerInfo managerInfo = getManagerTable().find(Filters.eq(MDID, mdId)).first();
287
		if (managerInfo == null) {
288
			throw new  MDStoreException("Error, unable to find MDStore " + mdId);
349 289
		}
350
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get(MDStoreConstants.TRANSACTIONS);
351
		if (transactions != null) {
352
			for (int i = 0; i < transactions.size(); i++) {
353
				final MDStoreTransactionInfo transaction = new MDStoreTransactionInfo();
354
				final DBObject value = (DBObject) transactions.get(i);
355
				final String transactionId = (String) value.get(MDStoreConstants.ID);
356
				transaction.setId(transactionId);
357
				transaction.setDate((Date) value.get(MDStoreConstants.DATE));
358
				transaction.setRefresh((Boolean) value.get(MDStoreConstants.REFRESH));
359
				transaction.setSize(db.getCollection(transactionId).count());
360
				result.addTransactionInfo(transaction);
361
			}
362
		}
363
		return result;
290
		return managerInfo;
364 291
	}
365 292

  
366
	/*
293
	/**
367 294
	 * (non-Javadoc)
368 295
	 *
369
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
296
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
370 297
	 */
371 298
	@Override
372 299
	public Boolean dropUsed(final String mdId, final String idToDrop) throws  MDStoreException {
373 300
		verifyConsistency();
374
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
375
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdId);
376
		return dropStore(mdstoreInfo, idToDrop, MDStoreConstants.EXPIRING);
377
	}
301
		final MDStoreManagerInfo managerInfo = getInfoForCurrentMdStore(mdId);
378 302

  
379
	private boolean dropStore(DBObject mdstoreInfo, final String idToDrop, String transactionListName) throws  MDStoreException {
380
		final BasicDBList transactionList = (BasicDBList) mdstoreInfo.get(transactionListName);
381
		for (int i = 0; i < transactionList.size(); i++) {
382
			final DBObject value = (DBObject) transactionList.get(i);
383
			final String currentUsedId = (String) value.get(MDStoreConstants.ID);
384
			if (currentUsedId.equals(idToDrop)) {
385
				db.getCollection(idToDrop).drop();
386
				db.getCollection(MDStoreConstants.DISCARDED_PREFIX + idToDrop).drop();
387
				transactionList.remove(value);
388
				mdstoreInfo.put(transactionListName, transactionList);
389
				this.getManagerTable().findOneAndReplace(new BasicDBObject(
390
						MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
391
				return true;
392
			}
303
		final Optional<MDStoreExpiredInfo> info = managerInfo.getExpiring().stream()
304
				.filter(i -> idToDrop.equals(i.getId()))
305
				.findFirst();
306

  
307
		if (info.isPresent()) {
308
			doDrop(idToDrop);
309
			managerInfo.getExpiring().remove(info.get());
310

  
311
			getManagerTable().findOneAndReplace(Filters.eq(MDID, managerInfo.getMdId()), managerInfo);
312
			return true;
313
		} else {
314
			throw new MDStoreException("Error, unable to drop collection " + idToDrop);
393 315
		}
394
		throw new  MDStoreException("Error, unable to drop collection " + idToDrop);
395 316
	}
396 317

  
397
	/*
318
	private void doDrop(final String mdId) {
319
		db.getCollection(mdId).drop();
320
		db.getCollection(DISCARDED_PREFIX + mdId).drop();
321
		log.debug("deleted collection " + mdId);
322
	}
323

  
324
	/**
398 325
	 * (non-Javadoc)
399 326
	 *
400
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#garbage()
327
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#garbage()
401 328
	 */
402 329
	@Override
403 330
	public void garbage() throws  MDStoreException {
404 331
		verifyConsistency();
405 332
		log.info("Start garbage collection of MdStore");
406
		final FindIterable<DBObject> it = this.managerTable.find();
407 333
		int totalDeleted = 0;
408
		for (DBObject currentObject : it) {
334
		for (MDStoreManagerInfo currentInfo : getManagerTable().find()) {
409 335
			if (log.isDebugEnabled()) {
410
				log.debug("start to check id: " + currentObject.get(MDStoreConstants.CURRENT_ID));
336
				log.debug("start to check id: " + currentInfo.getCurrentId());
411 337
			}
412
			garbageExpiring(currentObject, (String) currentObject.get(MDStoreConstants.CURRENT_ID));
413
			garbageTransactions(currentObject, (String) currentObject.get(MDStoreConstants.CURRENT_ID));
414
			this.getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, currentObject.get(MDStoreConstants.INTERNAL_MONGO_ID)), currentObject);
338
			garbageExpiring(currentInfo);
339
			garbageTransactions(currentInfo);
340
			getManagerTable().findOneAndReplace(Filters.eq(MDID, currentInfo.getMdId()), currentInfo);
415 341
		}
416 342

  
417 343
		// DELETING Collection that are not in the metadataManager table
418
		MongoIterable<String> collections = this.db.listCollectionNames();
419
		for (String collection : collections) {
420
			if ((collection.length() > 30) && (collection.contains(MDStoreConstants.DISCARDED_PREFIX) == false) && (collection.contains("resolved") == false)) {
421
				final DBObject item = getMetadataObjectForCollections(collection);
422 344

  
423
				if (shouldDelete(collection, item)) {
345
		for (String coll : db.listCollectionNames()) {
346
			if ((coll.length() > 30) && (!coll.contains(DISCARDED_PREFIX)) && (!coll.contains("resolved"))) {
347
				final MDStoreManagerInfo info = getMetadataObjectForCollections(coll);
348

  
349
				if (shouldDelete(coll, info)) {
424 350
					if (log.isDebugEnabled()) {
425
						log.debug("delete collection: " + collection + " from mongo");
351
						log.debug("delete collection: " + coll + " from mongo");
426 352
					}
427
					db.getCollection(collection).drop();
428
					db.getCollection(MDStoreConstants.DISCARDED_PREFIX + collection).drop();
353
					doDrop(coll);
429 354
					if (log.isDebugEnabled()) {
430
						log.debug("delete collection: discarded-" + collection + " from mongo");
355
						log.debug("delete collection: discarded-" + coll + " from mongo");
431 356
					}
432 357
				}
433 358
			}
......
436 361
		log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted);
437 362
	}
438 363

  
439
	private DBObject getMetadataObjectForCollections(final String collectionName) {
440
		if (collectionName == null) return null;
441
		final String tmp = collectionName.contains(
442
				MDStoreConstants.DISCARDED_PREFIX) == true ? StringUtils.substringAfter(collectionName, MDStoreConstants.DISCARDED_PREFIX) : collectionName;
443
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + MDStoreConstants.MDSTORE_ID_SUFFIX;
364
	private boolean shouldDelete(final String collection, final MDStoreManagerInfo managerInfo) {
444 365

  
445
		//DBObject query = QueryBuilder.start("mdId").is(collectionNameCleaned).get();
446
		Bson query = Filters.eq(MDStoreConstants.MDID, collectionNameCleaned);
447
		return this.managerTable.find(query).first();
448

  
449
	}
450

  
451
	private boolean shouldDelete(final String collectionName, final DBObject metadataManagerInstance) {
452
		log.debug("should delete instance " + metadataManagerInstance);
453
		if ((metadataManagerInstance == null) || (metadataManagerInstance.get(MDStoreConstants.CURRENT_ID) == null)) {
366
		if (StringUtils.isBlank(managerInfo.getCurrentId())) {
454 367
			log.debug("the instance has not currentID");
455 368
			return true;
456 369
		}
457
		String currentId = (String) metadataManagerInstance.get(MDStoreConstants.CURRENT_ID);
458
		if (collectionName.equals(currentId)) return false;
459
		BasicDBList expiringList = (BasicDBList) metadataManagerInstance.get(MDStoreConstants.EXPIRING);
460
		if (findInList(expiringList, collectionName, MDStoreConstants.ID) == true) return false;
461
		BasicDBList transactionsList = (BasicDBList) metadataManagerInstance.get(MDStoreConstants.TRANSACTIONS);
462
		return findInList(transactionsList, collectionName, MDStoreConstants.ID) != true;
370
		if (collection.equals(managerInfo.getCurrentId())) {
371
			return false;
372
		}
373
		if (managerInfo.getExpiring().stream().anyMatch(i -> collection.equals(i.getId()))) {
374
			return false;
375
		}
376
		return managerInfo.getTransactions().stream().anyMatch(i -> collection.equals(i.getId()));
463 377
	}
464 378

  
465
	private boolean findInList(final BasicDBList list, final String object, final String tagname) {
466
		if (list == null) return false;
467
		for (int i = 0; i < list.size(); i++) {
468
			DBObject currentObject = (DBObject) list.get(i);
469
			final String id = (String) currentObject.get(tagname);
470
			if (id.equals(object)) return true;
379
	private MDStoreManagerInfo getMetadataObjectForCollections(final String collection) throws MDStoreException {
380
		if (collection == null) {
381
			throw new MDStoreException("missing collection name");
471 382
		}
472
		return false;
473
	}
383
		final String tmp = collection.contains(
384
				DISCARDED_PREFIX) == true ? StringUtils.substringAfter(collection, DISCARDED_PREFIX) : collection;
385
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + MDSTORE_ID_SUFFIX;
474 386

  
475
	/**
476
	 * Delete.
477
	 *
478
	 * @param list     the list
479
	 * @param toRemove the to remove
480
	 */
481
	private void delete(final BasicDBList list, final List<DBObject> toRemove) {
482
		for (final DBObject obj : toRemove) {
483
			if (log.isDebugEnabled()) {
484
				log.debug("deleting " + obj);
485
			}
486
			list.remove(obj);
487
		}
387
		return getManagerTable().find(Filters.eq(MDID, collectionNameCleaned)).first();
488 388
	}
489 389

  
490 390
	/**
491 391
	 * Garbage transactions.
492 392
	 *
493
	 * @param currentObject the current object
494
	 * @param currentId     the current id
393
	 * @param currentInfo the current object
495 394
	 */
496
	private void garbageTransactions(final DBObject currentObject, final String currentId) {
395
	private void garbageTransactions(final MDStoreManagerInfo currentInfo) {
497 396
		if (log.isDebugEnabled()) {
498 397
			log.debug("Start garbage transactions ");
499 398
		}
500 399

  
501
		final BasicDBList expiring = (BasicDBList) currentObject.get(MDStoreConstants.TRANSACTIONS);
502
		if ((expiring == null) || (expiring.size() <= getMaxTransactions())) return;
503

  
504
		List<DBObject> expiringList = Lists.newArrayList();
505

  
506
		for (int i = 0; i < expiring.size(); i++) {
507
			final DBObject cobj = (DBObject) expiring.get(i);
508
			if (cobj != null) {
509
				expiringList.add((DBObject) expiring.get(i));
510
			}
511

  
400
		if (currentInfo.getTransactions().size() <= getMaxTransactions()) {
401
			return;
512 402
		}
513 403

  
514
		Collections.sort(expiringList, MDStoreUtils.getComparatorOnDate());
404
		final TreeSet<MDStoreTransactionInfo> expiringSet = Sets.newTreeSet(MDStoreUtils.getComparatorOnDate());
405
		expiringSet.addAll(currentInfo.getTransactions());
515 406

  
516
		List<DBObject> toRemove = Lists.newArrayList();
517
		int i = 0;
407
		int toRemove = 0;
518 408

  
519
		// We should remove the k item less recent
520
		// where k = numberOftotalTransaction - maxNumberOfTransaction
521
		// k = numberOfItemToRemove
522

  
523
		while (((expiringList.size() - toRemove.size()) > getMaxTransactions()) || (i < expiringList.size())) {
524
			DBObject currentObj = expiringList.get(i++);
525
			String objectId = (String) currentObj.get(MDStoreConstants.ID);
526
			if (!objectId.equals(currentId)) {
409
		for(MDStoreTransactionInfo i : expiringSet) {
410
			if (currentInfo.getTransactions().size() - toRemove >= getMaxTransactions()) {
411
				return;
412
			}
413
			if (!i.getId().equals(currentInfo.getCurrentId())) {
527 414
				if (log.isDebugEnabled()) {
528
					log.debug("delete collection: " + objectId + " from mongo");
415
					log.debug("delete collection: " + i.getId() + " from mongo");
529 416
				}
530
				db.getCollection(objectId).drop();
531
				db.getCollection(MDStoreConstants.DISCARDED_PREFIX + objectId).drop();
532
				if (log.isDebugEnabled()) {
533
					log.debug("delete collection: discarded-" + objectId + " from mongo");
534
				}
535
				toRemove.add(currentObj);
417
				toRemove++;
418
				doDrop(i.getId());
536 419
			} else {
537
				if (log.isDebugEnabled()) {
538
					log.debug("Cannot remove transaction " + objectId + " because is the currentId: " + currentId);
539
				}
420
				log.debug("Cannot remove transaction " + i.getId() + " because is the currentId: " + currentInfo.getCurrentId());
540 421
			}
541 422
		}
542

  
543
		delete(expiring, toRemove);
544
		log.info("Deleted " + toRemove.size() + " transactions, mdStore Id:" + currentObject.get(MDStoreConstants.MDID));
423
		log.info("Deleted " + toRemove + " transactions, mdStore Id:" + currentInfo.getMdId());
545 424
	}
546 425

  
547 426
	/**
548 427
	 * Garbage expiring.
549 428
	 *
550
	 * @param currentObject the current object
551
	 * @param currentId     the current id
429
	 * @param currentInfo the current object
552 430
	 */
553
	private void garbageExpiring(final DBObject currentObject, final String currentId) {
431
	private void garbageExpiring(final MDStoreManagerInfo currentInfo) {
554 432
		if (log.isDebugEnabled()) {
555
			log.debug("Start to search expiring mdstores for id: " + currentObject.get(MDStoreConstants.MDID));
433
			log.debug("Start to search expiring mdstores for id: " + currentInfo.getMdId());
556 434
		}
557
		final BasicDBList expiring = (BasicDBList) currentObject.get(MDStoreConstants.EXPIRING);
558
		final List<DBObject> toRemove = Lists.newArrayList();
559
		if (log.isDebugEnabled()) {
560
			if (expiring == null) {
561
				log.debug("expiring list is null");
562
			} else {
563
				log.debug("expiring list size is :" + expiring.size());
564
			}
565
		}
566
		if ((expiring == null) || (expiring.size() == 0)) {
567
			log.debug("Deleted  0  expired  collections, mdStore Id:" + currentObject.get(MDStoreConstants.MDID));
568
			return;
569
		}
570
		for (int i = 0; i < expiring.size(); i++) {
571
			final DBObject currentExpiredStore = (DBObject) expiring.get(i);
572
			final String currentUsedId = (String) currentExpiredStore.get(MDStoreConstants.ID);
573
			final long d = getExpiringDays(currentExpiredStore, MDStoreConstants.LAST_READ);
574
			if (log.isDebugEnabled()) {
575
				log.debug("the store :" + currentId + " expired since " + d + "days ");
576
			}
577
			// DELETE the collection where the last time they was read
578
			// is more than 3 days ago
435
		final List<MDStoreExpiredInfo> expiring = currentInfo.getExpiring();
436
		expiring.forEach(i -> {
437

  
438
			final LocalDate readDate = i.getLastRead().toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
439
			final long d = Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays();
579 440
			if (d > getExpiredDays()) {
580
				if (currentUsedId.equals(currentId) == false) {
581
					db.getCollection(currentUsedId).drop();
582
					db.getCollection(MDStoreConstants.DISCARDED_PREFIX + currentUsedId).drop();
583
					log.debug("deleted collection " + currentUsedId);
441
				if (!i.getId().equals(currentInfo.getCurrentId())) {
442
					doDrop(i.getId());
584 443
				}
585
				toRemove.add(currentExpiredStore);
586 444
			}
587
		}
588
		delete(expiring, toRemove);
589
		log.debug("Deleted expired " + toRemove.size() + "collections, mdStore Id:" + currentObject.get(MDStoreConstants.MDID));
590
	}
445
		});
591 446

  
592
	/**
593
	 * Gets the expiring days.
594
	 *
595
	 * @param value     the value
596
	 * @param paramName the param name
597
	 * @return the expiring days
598
	 */
599
	private long getExpiringDays(final DBObject value, final String paramName) {
600

  
601
		final Date lastRead = (Date) value.get(paramName);
602

  
603
		final LocalDate readDate = lastRead.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
604
		return Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays();
447
		log.debug("Deleted expired collections, mdId: " + currentInfo.getMdId());
605 448
	}
606 449

  
607 450
	/**
608
	 * Gets the expired days.
609
	 *
610
	 * @return the expiredDays
611
	 */
612
	public int getExpiredDays() {
613
		if (this.expiredDays == 0) return 3;
614
		return expiredDays;
615
	}
616

  
617
	/**
618
	 * Sets the expired days.
619
	 *
620
	 * @param expiredDays the expiredDays to set
621
	 */
622
	public void setExpiredDays(final int expiredDays) {
623
		this.expiredDays = expiredDays;
624
	}
625

  
626
	/**
627 451
	 * Update incremental.
628 452
	 *
629 453
	 * @param transactionId the transaction id
630 454
	 * @param currentId     the current id
631 455
	 */
632 456
	private void updateIncremental(final String transactionId, final String currentId) {
633
		final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class);
634
		final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class);
635
		final FindIterable<DBObject> it = transaction.find().noCursorTimeout(true);
636
		for (DBObject currentObj : it) {
457
		final MongoCollection<MDStoreRecord> transaction = db.getCollection(transactionId, MDStoreRecord.class);
458
		final MongoCollection<MDStoreRecord> mdstore = db.getCollection(currentId, MDStoreRecord.class);
637 459

  
638
			BasicDBObject newObj = new BasicDBObject();
460
		for (MDStoreRecord record : transaction.find().noCursorTimeout(true)) {
639 461

  
640
			final String id = (String) currentObj.get(MDStoreConstants.ID);
641
			final String body = (String) currentObj.get(MDStoreConstants.BODY);
642
			newObj.put(MDStoreConstants.ID, id);
643
			newObj.put(MDStoreConstants.BODY, body);
644
			if (StringUtils.isNotBlank(id)) {
462
			final MDStoreRecord r = record.clone();
463
			if (StringUtils.isNotBlank(r.getId())) {
645 464
				//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
646 465
				//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
647
				final MongoCollection<DBObject> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
648
				mdstoreWrite.replaceOne(new BasicDBObject(MDStoreConstants.ID, id), newObj, new UpdateOptions().upsert(true));
466
				final MongoCollection<MDStoreRecord> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
467
				mdstoreWrite.replaceOne(Filters.eq(ID, r.getId()), r, new UpdateOptions().upsert(true));
649 468
			}
650 469
		}
651 470
	}
652 471

  
653
	/*
472
	/**
654 473
	 * (non-Javadoc)
655 474
	 *
656
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
475
	 * @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
657 476
	 */
658 477
	@Override
659 478
	public Boolean dropTransaction(final String mdId, final String idToDrop) throws  MDStoreException {
660 479
		verifyConsistency();
661
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
662
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdId);
663
		return dropStore(mdstoreInfo, idToDrop, MDStoreConstants.TRANSACTIONS);
480
		final MDStoreManagerInfo managerInfo = getInfoForCurrentMdStore(mdId);
481
		managerInfo.getTransactions().removeIf(info -> {
482
			if (idToDrop.equals(info.getId())) {
483
				doDrop(idToDrop);
484
				return true;
485
			} else {
486
				return false;
487
			}
488
		});
489
		getManagerTable().findOneAndReplace(Filters.eq(MDID, managerInfo.getMdId()), managerInfo);
490
		return true;
664 491
	}
665 492

  
666 493
	public void garbageTransactionsOnStart() throws  MDStoreException {
667 494
		verifyConsistency();
668
		FindIterable<DBObject> it = this.managerTable.find();
669
		for (DBObject currentObject : it) {
670
			final BasicDBList transactions = (BasicDBList) currentObject.get(MDStoreConstants.TRANSACTIONS);
671
			if ((transactions != null) && (transactions.size() > 0)) {
672
				for (int i = 0; i < transactions.size(); i++) {
673
					final DBObject currentTransactions = (DBObject) transactions.get(i);
674
					final String id = (String) currentTransactions.get(MDStoreConstants.ID);
675
					db.getCollection(id).drop();
676
					db.getCollection(MDStoreConstants.DISCARDED_PREFIX + id).drop();
677
					log.debug("deleted collection " + id);
678
				}
679
				currentObject.put(MDStoreConstants.TRANSACTIONS, new BasicDBList());
680
				this.getManagerTable().findOneAndReplace(new BasicDBObject(
681
						MDStoreConstants.INTERNAL_MONGO_ID, currentObject.get(MDStoreConstants.INTERNAL_MONGO_ID)), currentObject);
682
			}
495
		for (MDStoreManagerInfo info : getManagerTable().find()) {
496
			info.getTransactions().forEach(tx -> doDrop(tx.getId()));
497
			info.setTransactions(Lists.newArrayList());
498
			getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
683 499
		}
684 500
	}
685 501

  
......
701 517
		this.maxTransactions = maxTransactions;
702 518
	}
703 519

  
520
	/**
521
	 * Gets the expired days.
522
	 *
523
	 * @return the expiredDays
524
	 */
525
	public int getExpiredDays() {
526
		return expiredDays;
527
	}
528

  
529
	/**
530
	 * Sets the expired days.
531
	 *
532
	 * @param expiredDays the expiredDays to set
533
	 */
534
	public void setExpiredDays(final int expiredDays) {
535
		this.expiredDays = expiredDays;
536
	}
704 537
}

Also available in: Unified diff