Project

General

Profile

« Previous | Next » 

Revision 55336

[maven-release-plugin] copy for tag cnr-mongo-mdstore-6.0.3

View differences:

modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/pom.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
		<groupId>eu.dnetlib</groupId>
5
		<artifactId>dnet45-parent</artifactId>
6
		<version>1.0.0</version>
7
	</parent>
8
	<modelVersion>4.0.0</modelVersion>
9
	<groupId>eu.dnetlib</groupId>
10
	<artifactId>cnr-mongo-mdstore</artifactId>
11
	<packaging>jar</packaging>
12
	<version>6.0.3</version>
13
	<scm>
14
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3</developerConnection>
15
	</scm>
16
	<dependencies>
17
		<dependency>
18
			<groupId>junit</groupId>
19
			<artifactId>junit</artifactId>
20
			<version>${junit.version}</version>
21
			<scope>test</scope>
22
		</dependency>
23
		<dependency>
24
			<groupId>org.springframework</groupId>
25
			<artifactId>spring-test</artifactId>
26
			<version>${spring.version}</version>
27
			<scope>test</scope>
28
		</dependency>
29
		<dependency>
30
			<groupId>org.mockito</groupId>
31
			<artifactId>mockito-core</artifactId>
32
			<version>${mockito.version}</version>
33
			<scope>test</scope>
34
		</dependency>
35
		<dependency>
36
			<groupId>eu.dnetlib</groupId>
37
			<artifactId>cnr-test-utils</artifactId>
38
			<version>[1.0.0,2.0.0)</version>
39
			<scope>test</scope>
40
		</dependency>
41
		<dependency>
42
			<groupId>eu.dnetlib</groupId>
43
			<artifactId>cnr-modular-mdstore-service</artifactId>
44
			<version>[6.0.0,7.0.0)</version>
45
		</dependency>
46
		<dependency>
47
			<groupId>eu.dnetlib</groupId>
48
			<artifactId>cnr-inspector</artifactId>
49
			<version>[1.0.0,2.0.0)</version>
50
		</dependency>
51
		<dependency>
52
			<groupId>org.mongodb</groupId>
53
			<artifactId>mongo-java-driver</artifactId>
54
			<version>${mongodb.driver.version}</version>
55
		</dependency>
56
		<dependency>
57
			<groupId>com.google.code.gson</groupId>
58
			<artifactId>gson</artifactId>
59
			<version>2.2</version>
60
		</dependency>
61
		<dependency>
62
			<groupId>joda-time</groupId>
63
			<artifactId>joda-time</artifactId>
64
			<version>2.3</version>
65
		</dependency>
66
		<dependency>
67
			<groupId>com.ximpleware</groupId>
68
			<artifactId>vtd-xml</artifactId>
69
			<version>[2.12, 3.0.0)</version>
70
		</dependency>
71

  
72
	</dependencies>
73
</project>
modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/src/test/java/eu/dnetlib/data/mdstore/modular/mongodb/FeedSpeedTest.java
1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

  
3
import java.io.File;
4
import java.io.FileInputStream;
5
import java.io.IOException;
6
import java.util.Iterator;
7
import java.util.UUID;
8

  
9
import com.mongodb.DBObject;
10
import com.mongodb.client.MongoDatabase;
11
import eu.dnetlib.data.mdstore.MDStoreServiceException;
12
import eu.dnetlib.data.mdstore.modular.RecordParserFactory;
13
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
14
import org.apache.commons.io.IOUtils;
15
import org.junit.Before;
16
import org.junit.Ignore;
17
import org.junit.Test;
18
import org.junit.runner.RunWith;
19
import org.springframework.beans.factory.annotation.Autowired;
20
import org.springframework.core.io.ClassPathResource;
21
import org.springframework.test.context.ContextConfiguration;
22
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
23

  
24
@Ignore
25
@RunWith(SpringJUnit4ClassRunner.class)
26
@ContextConfiguration(classes = ConfigurationTestConfig.class)
27
public class FeedSpeedTest {
28

  
29
	private static final int N_RECORDS = 68271;
30

  
31
	@Autowired
32
	private MongoDatabase db;
33

  
34
	@Autowired
35
	private MDStoreDao dao;
36

  
37
	@Autowired
38
	private RecordParserFactory recordParserfactory;
39

  
40
	@Before
41
	public void setup() throws MDStoreServiceException {
42
		dao.createMDStore("speed_test", "testFormat", "testInterpretation", "testLayout");
43
	}
44

  
45

  
46
	@Test
47
	public void testSpeedFromFolder() throws IOException {
48
		Iterable<String> iterable = new Iterable<String>() {
49

  
50
			private int counter = 0;
51
			private double last = System.currentTimeMillis();
52

  
53
			@Override
54
			public Iterator<String> iterator() {
55
				return new Iterator<String>() {
56

  
57
					@Override
58
					public boolean hasNext() {
59
						return counter < N_RECORDS;
60
					}
61

  
62
					@Override
63
					public String next() {
64
						if (counter % 10000 == 0) {
65
							System.out.println("10K records processed in " + (System.currentTimeMillis() - last) / 1000 + " seconds");
66
							last = System.currentTimeMillis();
67
						}
68

  
69
						File f = new File(String.format("/var/lib/eagle/content/EDH/HD%06d.xml", counter++));
70
						if (f.exists()) {
71
							try {
72
								FileInputStream fileInputStream = new FileInputStream(f);
73
								String s = IOUtils.toString(fileInputStream);
74
								fileInputStream.close();
75
								return s;
76
							} catch (Exception e) {
77
								return null;
78
							}
79
						} else {
80
							try {
81
								FileInputStream fileInputStream = new FileInputStream(new File("/var/lib/eagle/content/EDH/HD000001.xml"));
82
								String s = IOUtils.toString(fileInputStream);
83
								fileInputStream.close();
84
								return s;
85
							} catch (Exception e) {
86
								return null;
87
							}
88
						}
89
					}
90

  
91
					@Override
92
					public void remove() {}
93
				};
94
			}
95
		};
96

  
97
		MongoMDStore mdStore =
98
				new MongoMDStore(UUID.randomUUID().toString(), db.getCollection("speed_test", DBObject.class), recordParserfactory.newInstance(), true, db);
99
		mdStore.feed(iterable, false);
100
	}
101

  
102
	//@Ignore
103
	@Test
104
	public void testFeedSpeedFromTemplate() throws MDStoreServiceException, IOException {
105
		MongoMDStore mdStore =
106
				new MongoMDStore(UUID.randomUUID().toString(), db.getCollection("speed_test", DBObject.class), recordParserfactory.newInstance(), false, db);
107
		mdStore.feed(new Iterable<String>() {
108

  
109
			private int counter = 0;
110
			private double last = System.currentTimeMillis();
111
			private String templateRecord = IOUtils.toString(new ClassPathResource("/eu/dnetlib/data/mdstore/modular/mongodb/templateRecord.xml")
112
					.getInputStream());
113

  
114
			@Override
115
			public Iterator<String> iterator() {
116
				return new Iterator<String>() {
117

  
118
					@Override
119
					public boolean hasNext() {
120
						return counter < N_RECORDS;
121
					}
122

  
123
					@Override
124
					public String next() {
125
						if (counter % 10000 == 0) {
126
							System.out.println("10K records processed in " + (System.currentTimeMillis() - last) / 1000 + " seconds");
127
							last = System.currentTimeMillis();
128
						}
129

  
130
						File f = new File(String.format("/var/lib/eagle/content/EDH/HD%06d.xml", counter++));
131
						if (f.exists()) {
132
							try {
133
								FileInputStream fileInputStream = new FileInputStream(f);
134
								String s = IOUtils.toString(fileInputStream);
135
								fileInputStream.close();
136
								return s;
137
							} catch (Exception e) {
138
								return null;
139
							}
140
						} else {
141
							counter++;
142
							try {
143
								FileInputStream fileInputStream = new FileInputStream(new File("/var/lib/eagle/content/EDH/HD000009.xml"));
144
								String s = IOUtils.toString(fileInputStream);
145
								fileInputStream.close();
146
								return s;
147
							} catch (Exception e) {
148
								return null;
149
							}
150
						}
151
					}
152

  
153
					@Override
154
					public void remove() {}
155
				};
156
			}
157
		}, false);
158
	}
159
}
modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MDStoreTransactionManagerImpl.java
1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

  
3
import java.util.ArrayList;
4
import java.util.Date;
5
import java.util.List;
6

  
7
import com.google.common.collect.Lists;
8
import com.mongodb.BasicDBList;
9
import com.mongodb.BasicDBObject;
10
import com.mongodb.DBObject;
11
import com.mongodb.WriteConcern;
12
import com.mongodb.client.FindIterable;
13
import com.mongodb.client.MongoCollection;
14
import com.mongodb.client.MongoDatabase;
15
import com.mongodb.client.MongoIterable;
16
import com.mongodb.client.model.Filters;
17
import com.mongodb.client.model.IndexOptions;
18
import com.mongodb.client.model.UpdateOptions;
19
import eu.dnetlib.data.mdstore.MDStoreServiceException;
20
import eu.dnetlib.data.mdstore.modular.connector.*;
21
import eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreUtils;
22
import org.apache.commons.lang3.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.bson.conversions.Bson;
26
import org.joda.time.DateTime;
27
import org.joda.time.Days;
28
import org.springframework.beans.factory.annotation.Required;
29

  
30
/**
31
 * The Class MDStoreTransactionManager.
32
 */
33
public class MDStoreTransactionManagerImpl implements MDStoreTransactionManager {
34

  
35
	/** The Constant log. */
36
	private static final Log log = LogFactory.getLog(MDStoreTransactionManagerImpl.class);
37

  
38
	/**
39
	 * The table name.
40
	 */
41
	private static String TABLE_NAME = "metadataManager";
42

  
43
	/** The max number of concurrent transactions per mdstore. */
44
	private int maxTransactions = 1;
45

  
46
	/**
47
	 * The db.
48
	 */
49
	private MongoDatabase db;
50

  
51
	/**
52
	 * The manager table.
53
	 */
54
	private MongoCollection<DBObject> managerTable;
55

  
56
	/** The expired days. */
57
	private int expiredDays;
58

  
59
	private final IndexOptions options = new IndexOptions().background(true);
60

  
61
	/**
62
	 * Bootstrap manager.
63
	 */
64
	private void bootstrapManager() {
65
		log.debug("Bootstrap Manager start");
66
		final MongoCollection<DBObject> metadataColl = db.getCollection("metadata", DBObject.class);
67
		final FindIterable<DBObject> values = metadataColl.find();
68
		this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
69
		for (DBObject object : values) {
70
			final String id = (String) object.get("mdId");
71
			String newId = id;
72
			if (id.contains("_")) {
73
				newId = StringUtils.substringBefore(id, "_");
74
			}
75
			final BasicDBObject input = new BasicDBObject();
76
			input.put("mdId", id);
77
			input.put("currentId", newId);
78
			input.put("expiring", new String[] {});
79
			input.put("transactions", new String[] {});
80
			getManagerTable().insertOne(input);
81
			log.debug(String.format("Added %s to Metadata Manager data structure", id));
82

  
83
		}
84
		final BasicDBObject ensureIndex = new BasicDBObject();
85
		ensureIndex.put("mdId", 1);
86
		log.debug("Create index in MetadaManager ");
87
		this.getManagerTable().createIndex(ensureIndex, options);
88
	}
89

  
90
	/**
91
	 * Gets the DBObject describing an mdstore. null if there is no mdstore with the given id.
92
	 *
93
	 * @param mdstoreID the mdStore identifier
94
	 * @return DBObject or null
95
	 */
96
	private DBObject getMDStoreAsDBObject(final String mdstoreID) throws MDStoreServiceException {
97
		final BasicDBObject query = new BasicDBObject();
98
		query.put("mdId", mdstoreID);
99
		final FindIterable<DBObject> it = this.getManagerTable().find(query);
100
		return it.first();
101
	}
102

  
103
	/**
104
	 * Verify consistency.
105
	 *
106
	 * @throws MDStoreServiceException
107
	 *             the MD store service exception
108
	 */
109
	@Override
110
	public void verifyConsistency() throws MDStoreServiceException {
111
		if (this.getManagerTable() == null) {
112
			if (!Lists.newArrayList(db.listCollectionNames()).contains(TABLE_NAME))
113
				bootstrapManager();
114
			else {
115
				this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
116
			}
117
		}
118
		if (this.getManagerTable() == null) throw new MDStoreServiceException("Something bad happen, unable to create managerTable");
119
	}
120

  
121
	/**
122
	 * {@inheritDoc}
123
	 *
124
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#createMDStore(java.lang.String)
125
	 */
126
	@Override
127
	public void createMDStore(final String mdId) throws MDStoreServiceException {
128
		log.debug("Creating new mdstore");
129
		verifyConsistency();
130
		String newId = mdId;
131
		if (mdId.contains("_")) {
132
			newId = StringUtils.substringBefore(mdId, "_");
133
		}
134
		final BasicDBObject instance = new BasicDBObject();
135
		instance.put("mdId", mdId);
136
		instance.put("currentId", newId);
137
		instance.put("expiring", new String[] {});
138
		getManagerTable().insertOne(instance);
139
	}
140

  
141
	/**
142
	 * {@inheritDoc}
143
	 *
144
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropMDStore(java.lang.String)
145
	 */
146
	@Override
147
	public void dropMDStore(final String mdId) throws MDStoreServiceException {
148
		verifyConsistency();
149
		log.debug("Droping MDStore: " + mdId);
150
		final BasicDBObject query = new BasicDBObject();
151
		query.put("mdId", mdId);
152
		final DBObject dropped = this.getManagerTable().findOneAndDelete(query);
153
		garbage();
154
		final String collectionName = (String) dropped.get("currentId");
155
		this.db.getCollection(collectionName).drop();
156
		this.db.getCollection("discarded-" + collectionName).drop();
157
	}
158

  
159
	/**
160
	 * {@inheritDoc}
161
	 *
162
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getMDStoreCollection(java.lang.String)
163
	 */
164
	@Override
165
	public String getMDStoreCollection(final String mdId) throws MDStoreServiceException {
166
		verifyConsistency();
167
		DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
168
		if (mdstoreInfo != null)
169
			return (String) mdstoreInfo.get("currentId");
170
		else return null;
171
	}
172

  
173
	/**
174
	 * {@inheritDoc}
175
	 *
176
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#startTransaction(java.lang.String, boolean)
177
	 */
178
	@Override
179
	public String startTransaction(final String mdId, final boolean refresh) throws MDStoreServiceException {
180
		verifyConsistency();
181
		log.info("Start transaction for metadata store " + mdId);
182
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
183
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
184
		String idCreation = StringUtils.substringBefore(mdId, "_");
185
		idCreation = idCreation + "::" + System.currentTimeMillis();
186

  
187
		BasicDBList values;
188
		if (mdstoreInfo.containsField("transactions")) {
189
			values = (BasicDBList) mdstoreInfo.get("transactions");
190
			if (values.size() > getMaxTransactions())
191
				throw new MDStoreServiceException("Cannot create more than " + getMaxTransactions() + " transactions, found: " + values.size() + ", mdId:"
192
						+ mdId);
193
		} else {
194
			values = new BasicDBList();
195
		}
196
		final BasicDBObject transactionMetadata = new BasicDBObject();
197
		transactionMetadata.put("id", idCreation);
198
		transactionMetadata.put("refresh", refresh);
199
		transactionMetadata.put("date", new Date());
200
		values.add(transactionMetadata);
201
		mdstoreInfo.put("transactions", values);
202
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
203
		return idCreation;
204
	}
205

  
206
	/**
207
	 * {@inheritDoc}
208
	 *
209
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#commit(java.lang.String, java.lang.String,
210
	 *      eu.dnetlib.data.mdstore.modular.connector.MDStore)
211
	 */
212
	@Override
213
	public boolean commit(final String transactionId, final String mdstoreId, final MDStore current) throws MDStoreServiceException {
214
		verifyConsistency();
215
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdstoreId);
216
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdstoreId);
217
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
218
		final DBObject transaction = findTransaction(transactions, transactionId);
219
		if (transaction == null) throw new MDStoreServiceException("Error, unable to find transaction with Id " + transactionId);
220
		final boolean refresh = (Boolean) transaction.get("refresh");
221
		transactions.remove(transaction);
222
		final String oldId = (String) mdstoreInfo.get("currentId");
223
		if (refresh) {
224
			mdstoreInfo.put("currentId", transactionId);
225
			final BasicDBList stillUsed = (BasicDBList) mdstoreInfo.get("expiring");
226
			if (stillUsed.size() == 0) {
227
				db.getCollection(oldId).drop();
228
				db.getCollection("discarded-" + oldId).drop();
229
			}
230
			log.debug("Replaced collection ");
231
		} else {
232
			log.debug("commit incremental ");
233
			updateIncremental(transactionId, oldId);
234
			db.getCollection(transactionId).drop();
235
			db.getCollection("discarded-" + transactionId).drop();
236
		}
237
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
238

  
239
		log.info("Committed transaction for metadata store " + mdstoreId);
240
		return true;
241
	}
242

  
243
	/**
244
	 * Find transaction.
245
	 *
246
	 * @param transactions
247
	 *            the transactions
248
	 * @param transactionId
249
	 *            the transaction id
250
	 * @return the DB object
251
	 */
252
	private DBObject findTransaction(final BasicDBList transactions, final String transactionId) {
253
		if (transactions.size() == 0) return null;
254
		for (Object tx : transactions) {
255
			final BasicDBObject transaction = (BasicDBObject) tx;
256
			final String id = (String) transaction.get("id");
257
			if (transactionId.equals(id)) return transaction;
258
		}
259
		return null;
260

  
261
	}
262

  
263
	/**
264
	 * Gets the db.
265
	 *
266
	 * @return the db
267
	 */
268
	public MongoDatabase getDb() {
269
		return db;
270
	}
271

  
272
	/**
273
	 * Sets the db.
274
	 *
275
	 * @param db
276
	 *            the db to set
277
	 */
278
	@Required
279
	public void setDb(final MongoDatabase db) {
280
		this.db = db;
281
	}
282

  
283
	/**
284
	 * {@inheritDoc}
285
	 *
286
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#readMdStore(java.lang.String)
287
	 */
288
	@Override
289
	public String readMdStore(final String mdStoreId) throws MDStoreServiceException {
290
		verifyConsistency();
291
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
292
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdStoreId);
293
		final String currentId = (String) mdstoreInfo.get("currentId");
294
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
295
		updateMdstoreUsed(values, currentId);
296
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
297
		return currentId;
298

  
299
	}
300

  
301

  
302
	/**
303
	 * Update mdstore used.
304
	 *
305
	 * @param values
306
	 *            the values
307
	 * @param mdId
308
	 *            the md id
309
	 */
310
	private void updateMdstoreUsed(final BasicDBList values, final String mdId) {
311
		if (values.size() > 0) {
312
			for (Object value : values) {
313
				final DBObject obj = (DBObject) value;
314
				final String id = (String) obj.get("id");
315
				if (mdId.equals(id)) {
316
					obj.put("lastRead", new Date());
317
					return;
318
				}
319
			}
320
		}
321
		final BasicDBObject readStore = new BasicDBObject();
322
		readStore.put("id", mdId);
323
		readStore.put("lastRead", new Date());
324
		values.add(readStore);
325
	}
326

  
327
	/**
328
	 * Gets the manager table.
329
	 *
330
	 * @return the managerTable
331
	 */
332
	public MongoCollection<DBObject> getManagerTable() {
333
		return managerTable;
334
	}
335

  
336
	/**
337
	 * Sets the manager table.
338
	 *
339
	 * @param managerTable
340
	 *            the managerTable to set
341
	 */
342
	public void setManagerTable(final MongoCollection<DBObject> managerTable) {
343
		this.managerTable = managerTable;
344
	}
345

  
346
	/*
347
	 * (non-Javadoc)
348
	 *
349
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
350
	 */
351
	@Override
352
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdStoreId) throws MDStoreServiceException {
353
		verifyConsistency();
354
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
355
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdStoreId);
356
		final MDStoreManagerInfo result = new MDStoreManagerInfo();
357
		result.setCurrentId((String) mdstoreInfo.get("currentId"));
358
		result.setMdId((String) mdstoreInfo.get("mdId"));
359
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
360
		for (Object v : values) {
361
			final MDStoreExpiredInfo stillused = new MDStoreExpiredInfo();
362
			final DBObject value = (DBObject) v;
363
			stillused.setId((String) value.get("id"));
364
			stillused.setLastRead((Date) value.get("lastRead"));
365
			result.addExpiredItem(stillused);
366
		}
367
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
368
		if (transactions != null) {
369
			for (Object tx : transactions) {
370
				final MDStoreTransactionInfo transaction = new MDStoreTransactionInfo();
371
				final DBObject value = (DBObject) tx;
372
				final String transactionId = (String) value.get("id");
373
				transaction.setId(transactionId);
374
				transaction.setDate((Date) value.get("date"));
375
				transaction.setRefresh((Boolean) value.get("refresh"));
376
				transaction.setSize(db.getCollection(transactionId).count());
377
				result.addTransactionInfo(transaction);
378
			}
379
		}
380
		return result;
381
	}
382

  
383
	/*
384
	 * (non-Javadoc)
385
	 *
386
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
387
	 */
388
	@Override
389
	public Boolean dropUsed(final String mdId, final String idToDrop) throws MDStoreServiceException {
390
		verifyConsistency();
391
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
392
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
393
		return dropStore(mdstoreInfo, idToDrop, "expiring");
394
	}
395

  
396
	private boolean dropStore(DBObject mdstoreInfo, final String idToDrop, String transactionListName) throws MDStoreServiceException {
397
		final BasicDBList transactionList = (BasicDBList) mdstoreInfo.get(transactionListName);
398
		for (int i = 0; i < transactionList.size(); i++) {
399
			final DBObject value = (DBObject) transactionList.get(i);
400
			final String currentUsedId = (String) value.get("id");
401
			if (currentUsedId.equals(idToDrop)) {
402
				db.getCollection(idToDrop).drop();
403
				db.getCollection("discarded-" + idToDrop).drop();
404
				transactionList.remove(value);
405
				mdstoreInfo.put(transactionListName, transactionList);
406
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
407
				return true;
408
			}
409
		}
410
		throw new MDStoreServiceException("Error, unable to drop collection " + idToDrop);
411
	}
412

  
413
	/*
414
	 * (non-Javadoc)
415
	 *
416
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#garbage()
417
	 */
418
	@Override
419
	public void garbage() throws MDStoreServiceException {
420
		verifyConsistency();
421
		log.info("Start garbage collection of MdStore");
422
		final FindIterable<DBObject> it = this.managerTable.find();
423
		int totalDeleted = 0;
424
		for (DBObject currentObject :  it){
425
			if (log.isDebugEnabled()) {
426
				log.debug("start to check id: " + currentObject.get("currentId"));
427
			}
428
			garbageExpiring(currentObject, (String) currentObject.get("currentId"));
429
			garbageTransactions(currentObject, (String) currentObject.get("currentId"));
430
			this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
431
		}
432

  
433
		// DELETING Collection that are not in the metadataManager table
434
		MongoIterable<String> collections = this.db.listCollectionNames();
435
		for (String collection : collections) {
436
			if ((collection.length() > 30) && (!collection.contains("discarded-"))) {
437
				final DBObject item = getMetadataObjectForCollections(collection);
438

  
439
				if (shouldDelete(collection, item)) {
440
					if (log.isDebugEnabled()) {
441
						log.debug("delete collection: " + collection + " from mongo");
442
					}
443
					db.getCollection(collection).drop();
444
					db.getCollection("discarded-" + collection).drop();
445
					if (log.isDebugEnabled()) {
446
						log.debug("delete collection: discarded-" + collection + " from mongo");
447
					}
448
				}
449
			}
450
		}
451

  
452
		log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted);
453
	}
454

  
455
	private DBObject getMetadataObjectForCollections(final String collectionName) {
456
		if (collectionName == null) return null;
457
		final String postfix = "_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==";
458
		final String tmp = collectionName.contains("discarded-") ? StringUtils.substringAfter(collectionName, "discarded-") : collectionName;
459
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + postfix;
460

  
461
		//DBObject query = QueryBuilder.start("mdId").is(collectionNameCleaned).get();
462
		Bson query = Filters.eq("mdId", collectionNameCleaned);
463
		return this.managerTable.find(query).first();
464

  
465
	}
466

  
467
	private boolean shouldDelete(final String collectionName, final DBObject metadataManagerInstance) {
468
		log.debug("should delete instance "+metadataManagerInstance);
469
		if((metadataManagerInstance== null) || (metadataManagerInstance.get("currentId")== null)){
470
			log.debug("the instance has not currentID");
471
			return true;
472
		}
473
		String currentId = (String) metadataManagerInstance.get("currentId");
474
		if (collectionName.equals(currentId)) return false;
475
		BasicDBList expiringList = (BasicDBList) metadataManagerInstance.get("expiring");
476
		if (findInList(expiringList, collectionName, "id")) return false;
477
		BasicDBList transactionsList = (BasicDBList) metadataManagerInstance.get("transactions");
478
		return !findInList(transactionsList, collectionName, "id");
479
	}
480

  
481
	private boolean findInList(final BasicDBList list, final String object, final String tagname) {
482
		if (list == null) return false;
483
		for (Object o : list) {
484
			DBObject currentObject = (DBObject) o;
485
			final String id = (String) currentObject.get(tagname);
486
			if (id.equals(object)) return true;
487
		}
488
		return false;
489
	}
490

  
491
	/**
492
	 * Delete.
493
	 *
494
	 * @param list
495
	 *            the list
496
	 * @param toRemove
497
	 *            the to remove
498
	 */
499
	private void delete(final BasicDBList list, final List<DBObject> toRemove) {
500

  
501
		for (final DBObject obj : toRemove) {
502
			if (log.isDebugEnabled()) {
503
				log.debug("deleting " + obj);
504
			}
505
			list.remove(obj);
506
		}
507
	}
508

  
509
	/**
510
	 * Garbage transactions.
511
	 *
512
	 * @param currentObject
513
	 *            the current object
514
	 * @param currentId
515
	 *            the current id
516
	 */
517
	private void garbageTransactions(final DBObject currentObject, final String currentId) {
518
		if (log.isDebugEnabled()) {
519
			log.debug("Start garbage transactions ");
520
		}
521

  
522
		final BasicDBList expiring = (BasicDBList) currentObject.get("transactions");
523
		if ((expiring == null) || (expiring.size() <= getMaxTransactions())) return;
524

  
525
		List<DBObject> expiringList = Lists.newArrayList();
526

  
527
		for (Object o : expiring) {
528
			final DBObject cobj = (DBObject) o;
529
			if (cobj != null) {
530
				expiringList.add((DBObject) o);
531
			}
532

  
533
		}
534

  
535
		expiringList.sort(MDStoreUtils.getComparatorOnDate());
536

  
537
		List<DBObject> toRemove = Lists.newArrayList();
538
		int i = 0;
539

  
540
		// We should remove the k item less recent
541
		// where k = numberOftotalTransaction - maxNumberOfTransaction
542
		// k = numberOfItemToRemove
543

  
544
		while (((expiringList.size() - toRemove.size()) > getMaxTransactions()) || (i < expiringList.size())) {
545
			DBObject currentObj = expiringList.get(i++);
546
			String objectId = (String) currentObj.get("id");
547
			if (!objectId.equals(currentId)) {
548
				if (log.isDebugEnabled()) {
549
					log.debug("delete collection: " + objectId + " from mongo");
550
				}
551
				db.getCollection(objectId).drop();
552
				db.getCollection("discarded-" + objectId).drop();
553
				if (log.isDebugEnabled()) {
554
					log.debug("delete collection: discarded-" + objectId + " from mongo");
555
				}
556
				toRemove.add(currentObj);
557
			} else {
558
				if (log.isDebugEnabled()) {
559
					log.debug("Cannot remove transaction " + objectId + " because is the currentId: " + currentId);
560
				}
561
			}
562
		}
563

  
564
		delete(expiring, toRemove);
565
		log.info("Deleted " + toRemove.size() + " transactions, mdStore Id:" + currentObject.get("mdId"));
566
	}
567

  
568
	/**
569
	 * Garbage expiring.
570
	 *
571
	 * @param currentObject
572
	 *            the current object
573
	 * @param currentId
574
	 *            the current id
575
	 */
576
	private void garbageExpiring(final DBObject currentObject, final String currentId) {
577
		if (log.isDebugEnabled()) {
578
			log.debug("Start to search expiring mdstores for id: " + currentObject.get("mdId"));
579
		}
580
		final BasicDBList expiring = (BasicDBList) currentObject.get("expiring");
581
		final List<DBObject> toRemove = Lists.newArrayList();
582
		if (log.isDebugEnabled()) {
583
			if (expiring == null) {
584
				log.debug("expiring list is null");
585
			} else {
586
				log.debug("expiring list size is :" + expiring.size());
587
			}
588
		}
589
		if ((expiring == null) || (expiring.size() == 0)) {
590
			log.debug("Deleted  0  expired  collections, mdStore Id:" + currentObject.get("mdId"));
591
			return;
592
		}
593
		for (Object anExpiring : expiring) {
594
			final DBObject currentExpiredStore = (DBObject) anExpiring;
595
			final String currentUsedId = (String) currentExpiredStore.get("id");
596
			final Days d = getExpiringDays(currentExpiredStore, "lastRead");
597
			if (log.isDebugEnabled()) {
598
				log.debug("the store :" + currentId + " expired since " + d.getDays() + "days ");
599
			}
600
			// DELETE the collection where the last time they was read
601
			// is more than 3 days ago
602
			if (d.getDays() > getExpiredDays()) {
603
				if (!currentUsedId.equals(currentId)) {
604
					db.getCollection(currentUsedId).drop();
605
					db.getCollection("discarded-" + currentUsedId).drop();
606
					log.debug("deleted collection " + currentUsedId);
607
				}
608
				toRemove.add(currentExpiredStore);
609
			}
610
		}
611
		delete(expiring, toRemove);
612
		log.debug("Deleted expired " + toRemove.size() + "collections, mdStore Id:" + currentObject.get("mdId"));
613
	}
614

  
615
	/**
616
	 * Gets the expiring days.
617
	 *
618
	 * @param value
619
	 *            the value
620
	 * @param paramName
621
	 *            the param name
622
	 * @return the expiring days
623
	 */
624
	private Days getExpiringDays(final DBObject value, final String paramName) {
625
		final Date lastRead = (Date) value.get(paramName);
626
		final DateTime last = new DateTime(lastRead);
627
		final DateTime today = new DateTime();
628
		final Days d = Days.daysBetween(last, today);
629
		return d;
630
	}
631

  
632
	/**
633
	 * Gets the expired days.
634
	 *
635
	 * @return the expiredDays
636
	 */
637
	public int getExpiredDays() {
638
		if (this.expiredDays == 0) return 3;
639
		return expiredDays;
640
	}
641

  
642
	/**
643
	 * Sets the expired days.
644
	 *
645
	 * @param expiredDays
646
	 *            the expiredDays to set
647
	 */
648
	public void setExpiredDays(final int expiredDays) {
649
		this.expiredDays = expiredDays;
650
	}
651

  
652
	/**
653
	 * Update incremental.
654
	 *
655
	 * @param transactionId
656
	 *            the transaction id
657
	 * @param currentId
658
	 *            the current id
659
	 */
660
	private void updateIncremental(final String transactionId, final String currentId) {
661
		final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class);
662
		final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class);
663
		final FindIterable<DBObject> it = transaction.find().noCursorTimeout(true);
664
		for (DBObject currentObj : it) {
665
			final String id = (String) currentObj.get("id");
666
			DBObject newObj = new BasicDBObject(currentObj.toMap());
667
			//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
668
			//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
669
			final MongoCollection<DBObject> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
670
			mdstoreWrite.replaceOne(new BasicDBObject("id", id), newObj, new UpdateOptions().upsert(true));
671
		}
672
	}
673

  
674
	/*
675
	 * (non-Javadoc)
676
	 *
677
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
678
	 */
679
	@Override
680
	public Boolean dropTransaction(final String mdId, final String idToDrop) throws MDStoreServiceException {
681
		verifyConsistency();
682
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
683
		if (mdstoreInfo == null) throw new MDStoreServiceException("Error, unable to find Mdstore with Id " + mdId);
684
		return dropStore(mdstoreInfo, idToDrop, "transactions");
685
	}
686

  
687
	public void garbageTransactionsOnStart() throws MDStoreServiceException {
688
		verifyConsistency();
689
		FindIterable<DBObject> it = this.managerTable.find();
690

  
691
		final List<String> currentMdStoreId = Lists.newArrayList();
692
		for (DBObject currentObject : it){
693
			currentMdStoreId.add((String) currentObject.get("currentId"));
694
			final BasicDBList transactions = (BasicDBList) currentObject.get("transactions");
695
			if ((transactions != null) && (transactions.size() > 0)) {
696
				for (Object tx : transactions) {
697
					final DBObject currentTransactions = (DBObject) tx;
698
					final String id = (String) currentTransactions.get("id");
699
					db.getCollection(id).drop();
700
					db.getCollection("discarded-" + id).drop();
701
					log.debug("deleted collection " + id);
702
				}
703
				currentObject.put("transactions", new BasicDBList());
704
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
705
			}
706
		}
707

  
708
		//DELETING ALL THE DISCARDED COLLECTION THAT DISCARDED COLLECTION OF THE CURRENT MDSTORE
709
		final ArrayList<String> collectionsNames = Lists.newArrayList(db.listCollectionNames());
710

  
711
		for (String item : collectionsNames) {
712
			if (item.startsWith("discarded-")) {
713
				final String currentCollection = StringUtils.substringAfter(item, "discarded-");
714
				if (!currentMdStoreId.contains(currentCollection)) {
715
					log.info("Deleting discarded collection :" + item);
716
					this.db.getCollection(item).drop();
717
				}
718
			}
719
		}
720
	}
721

  
722
	/**
723
	 * Gets the max transactions.
724
	 *
725
	 * @return the maxTransactions
726
	 */
727
	public int getMaxTransactions() {
728
		return maxTransactions;
729
	}
730

  
731
	/**
732
	 * Sets the max transactions.
733
	 *
734
	 * @param maxTransactions
735
	 *            the maxTransactions to set
736
	 */
737
	public void setMaxTransactions(final int maxTransactions) {
738
		this.maxTransactions = maxTransactions;
739
	}
740
}
modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/src/test/resources/eu/dnetlib/data/mdstore/modular/mongodb/inputRecord.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<record xmlns:dc="http://purl.org/dc/elements/1.1/"
3
        xmlns:dr="http://www.driver-repository.eu/namespace/dr"
4
        xmlns:dri="http://www.driver-repository.eu/namespace/dri"
5
        xmlns:oaf="http://namespace.openaire.eu/oaf"
6
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
7
    <header xmlns="http://namespace.openaire.eu/">
8
        <dri:objIdentifier>od______2367::0001a50c6388e9bfcb791a924ec4b837</dri:objIdentifier>
9
        <dri:recordIdentifier>oai:pumaoai.isti.cnr.it:cnr.imati/cnr.ian.pv/1999-PP-018</dri:recordIdentifier>
10
        <dri:dateOfCollection/>
11
        <dri:mdFormat/>
12
        <dri:mdFormatInterpretation/>
13
        <dri:repositoryId>
14
            10d18b66-1d2a-4579-9adc-aa57b0821c7f_UmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZXMvUmVwb3NpdG9yeVNlcnZpY2VSZXNvdXJjZVR5cGU=
15
        </dri:repositoryId>
16
        <dr:objectIdentifier/>
17
        <dr:dateOfCollection>2016-11-22T09:15:30.817Z</dr:dateOfCollection>
18
        <dr:dateOfTransformation>2016-11-22T09:18:15.904Z</dr:dateOfTransformation>
19
        <oaf:datasourceprefix>od______2367</oaf:datasourceprefix>
20
    </header>
21
    <metadata xmlns="http://namespace.openaire.eu/">
22
        <dc:title>Anisotropic mechanisms for multiphasic unipolar electrograms. Simulation studies and experimental
23
            recordings
24
        </dc:title>
25
        <dc:creator>Colli Franzone, Piero,</dc:creator>
26
        <dc:creator>Guerri, Luciano,</dc:creator>
27
        <dc:creator>Pennacchio, Micol,</dc:creator>
28
        <dc:creator>Taccardi, Bruno</dc:creator>
29
        <dc:date>1999-09-30</dc:date>
30
        <dc:description>The origin of the multiple, complex morphologies observed in
31
            unipolar epicardial electrograms, and their relationships with myocardial
32
            architecture, have not been fully elucidated. To clarify this problem we
33
            simulated electrograms (EGs) with a model representing the heart as an
34
            anisotropic bidomain with unequal anisotropy ratio, ellipsoidal ventricular
35
            geometry, transmural fiber rotation, epi-endocardial obliqueness of fiber
36
            direction and a simplified conduction system. The electrograms were compared
37
            with those directly recorded from the surface of isolated dog hearts immersed
38
            in a conducting medium. The model accurately reproduced the recorded EG
39
            morphologies for excitation wave fronts that reach the recording sites by
40
            spreading either along or across fibers.The
41
            origin of the multiple waves that constitute the QRS complex could be better
42
            understood after splitting the current sources, the potential distributions and
43
            the EGs into a field component (further subdivided into an axial and a conormal component) and a "reference"
44
            component. The split model provides an explanation of the interaction between the three-dimensional geometry
45
            and direction of
46
            propagation of a spreading wave front, the architecture of the fibers through
47
            which excitation is spreading, the potential distributions and the QRS wave
48
            forms. Because epicardial potentials, electrograms and isochrone contours can be
49
            computed noninvasively from body surface measurements, interpreting epicardial
50
            EGs in terms of intramural events may have clinical relevance.
51
        </dc:description>
52
        <dc:identifier>http://puma.isti.cnr.it/dfdownloadnew.php?ident=cnr.imati/cnr.ian.pv/1999-PP-018</dc:identifier>
53
        <dc:identifier>
54
            http://puma.isti.cnr.it/rmydownload.php?filename=cnr.imati/cnr.ian.pv/1999-PP-018/1999-PP-018_0.ps
55
        </dc:identifier>
56
        <dc:language>eng</dc:language>
57
        <dc:source>Preprint ercim.cnr.ian//1999-1151, 1999.</dc:source>
58
        <dc:subject>Electrograms, bidomain model, reference potential, cardiac potential maps, anisotropic propagation,
59
            source splitting
60
        </dc:subject>
61
        <dc:subject>info:eu-repo/classification/msc/78A70,65N30</dc:subject>
62
        <dc:rights>info:eu-repo/semantics/openAccess</dc:rights>
63
        <dc:type>info:eu-repo/semantics/preprint</dc:type>
64
        <dr:CobjCategory>0016</dr:CobjCategory>
65
        <dr:CobjIdentifier/>
66
        <oaf:dateAccepted>1999-09-30</oaf:dateAccepted>
67
        <oaf:collectedDatasourceid>opendoar____::2367</oaf:collectedDatasourceid>
68
        <oaf:accessrights>OPEN</oaf:accessrights>
69
        <oaf:hostedBy id="opendoar____::2367" name="PUblication MAnagement"/>
70
        <oaf:collectedFrom id="opendoar____::2367" name="PUblication MAnagement"/>
71
    </metadata>
72
    <about xmlns:oai="http://www.openarchives.org/OAI/2.0/" xmlns="http://namespace.openaire.eu/">
73
        <provenance xmlns="http://www.openarchives.org/OAI/2.0/provenance"
74
                    xsi:schemaLocation="http://www.openarchives.org/OAI/2.0/provenance http://www.openarchives.org/OAI/2.0/provenance.xsd">
75
            <originDescription altered="true" harvestDate="2016-11-22T09:15:30.817Z">
76
                <baseURL>http://pumaoai.isti.cnr.it/openoai2.php</baseURL>
77
                <identifier>oai:pumaoai.isti.cnr.it:cnr.imati/cnr.ian.pv/1999-PP-018</identifier>
78
                <datestamp>1999-10-08</datestamp>
79
                <metadataNamespace>http://www.openarchives.org/OAI/2.0/oai_dc/</metadataNamespace>
80
            </originDescription>
81
        </provenance>
82
        <oaf:datainfo>
83
            <oaf:inferred>false</oaf:inferred>
84
            <oaf:deletedbyinference>false</oaf:deletedbyinference>
85
            <oaf:trust>0.9</oaf:trust>
86
            <oaf:inferenceprovenance/>
87
            <oaf:provenanceaction classid="sysimport:crosswalk:repository"
88
                                  classname="sysimport:crosswalk:repository"
89
                                  schemeid="dnet:provenanceActions" schemename="dnet:provenanceActions"/>
90
        </oaf:datainfo>
91
    </about>
92
</record>
modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoBulkWritesManager.java
1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

  
3
import java.util.List;
4
import java.util.Map;
5
import javax.annotation.Nullable;
6

  
7
import com.google.common.base.Predicate;
8
import com.google.common.collect.Lists;
9
import com.google.common.collect.Maps;
10
import com.mongodb.BasicDBObject;
11
import com.mongodb.DBObject;
12
import com.mongodb.WriteConcern;
13
import com.mongodb.client.MongoCollection;
14
import com.mongodb.client.model.*;
15
import eu.dnetlib.data.mdstore.MDStoreServiceException;
16
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
17
import eu.dnetlib.data.mdstore.modular.RecordParser;
18
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser;
19
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22

  
23
import static eu.dnetlib.data.mdstore.modular.MDStoreConstants.*;
24

  
25
public class MongoBulkWritesManager {
26

  
27
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
28
	private final boolean discardRecords;
29
	private final boolean indexRecords;
30
	private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
31
	private final List<MDFormatDescription> mdref;
32
	private RecordParser recordParser;
33
	private MongoCollection<DBObject> validCollection;
34
	private List<WriteModel<DBObject>> validBulkOperationList;
35
	private BulkWriteOptions writeOptions;
36
	private MongoCollection<DBObject> discardedCollection;
37
	private List<WriteModel<DBObject>> discardedBulkOperationList;
38
	private int bulkSize;
39

  
40
	public MongoBulkWritesManager(final MongoCollection<DBObject> collection,
41
			final MongoCollection<DBObject> discardedCollection,
42
			final List<MDFormatDescription> mdref,
43
			final int bulkSize,
44
			final RecordParser parser,
45
			final boolean discardRecords) {
46
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
47
		this.validBulkOperationList = Lists.newArrayList();
48

  
49
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
50
		this.discardedBulkOperationList = Lists.newArrayList();
51

  
52
		this.bulkSize = bulkSize;
53
		this.recordParser = parser;
54
		this.discardRecords = discardRecords;
55
		this.mdref = mdref;
56

  
57
		this.indexRecords = (this.mdref != null &&  !this.mdref.isEmpty());
58
		this.writeOptions = new BulkWriteOptions().ordered(false);
59
	}
60

  
61
	public void insert(final String record) throws MDStoreServiceException {
62
		Map<String, String> recordProperties = null;
63
		try {
64
			recordProperties = recordParser.parseRecord(record);
65

  
66
		} catch (Throwable e) {
67
			if (discardRecords) {
68
				log.debug("unhandled exception: " + e.getMessage());
69
				discardRecord(record);
70
			}
71
		}
72
		Map<String, List<String>> indexRecordField = null;
73
		try {
74
			if (indexRecords) {
75
				indexRecordField = indexFieldRecordParser.parseRecord(record, mdref);
76
			}
77
		} catch (IndexFieldRecordParserException e) {
78
			// could not index record fields
79
			throw new MDStoreServiceException("Are you using the correct type of store / index definition for the records in " + validCollection.getNamespace() + " ?", e);
80
		}
81

  
82
		log.debug("found props: " + recordProperties);
83
		if (recordProperties.containsKey(ID)) {
84
			final DBObject obj = buildDBObject(record, recordProperties, indexRecordField);
85
			if (log.isDebugEnabled()) {
86
				log.debug("Saving object" + obj);
87
			}
88
			validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject(ID, obj.get(ID)), obj, new UpdateOptions().upsert(true)));
89
			if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
90
				validCollection.bulkWrite(validBulkOperationList, writeOptions);
91
				validBulkOperationList.clear();
92
			}
93
		} else {
94
			if (discardRecords) {
95
				log.debug("parsed record seems invalid");
96
				discardRecord(record);
97
			}
98
		}
99
	}
100

  
101
	private void discardRecord(final String record) {
102
		discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject(BODY, record)));
103

  
104
		if (((discardedBulkOperationList.size() % bulkSize) == 0) && !discardedBulkOperationList.isEmpty()) {
105
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
106
			discardedBulkOperationList.clear();
107
		}
108
	}
109

  
110
	public void flushBulks() {
111
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
112
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
113
		if (!validBulkOperationList.isEmpty()) {
114
			validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
115
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
116
		}
117
		if (!discardedBulkOperationList.isEmpty()) {
118
			discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.JOURNALED);
119
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
120
		}
121
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
122
		validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
123
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
124
	}
125

  
126
	protected DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
127
		final DBObject obj = new BasicDBObject();
128
		obj.put(ID, recordProperties.get(ID));
129
		obj.put(ORIGINALID, recordProperties.get(ORIGINALID));
130
		obj.put(BODY, record);
131
		obj.put(TIMESTAMP, Long.valueOf(recordProperties.get(TIMESTAMP)));
132
		if (indexFieldProperties != null)
133
			obj.putAll(Maps.filterKeys(indexFieldProperties, new Predicate<String>() {
134
				//ensure we do not override the mandatory fields above with some unexpected value
135
				@Override
136
				public boolean apply(@Nullable final String s) {
137
					return !s.equalsIgnoreCase(ID) && !s.equalsIgnoreCase(ORIGINALID) && !s.equalsIgnoreCase(BODY) && !s.equalsIgnoreCase(TIMESTAMP);
138
				}
139
			}));
140
		return obj;
141
	}
142

  
143
	private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) {
144
		return collection.withWriteConcern(writeConcern);
145
	}
146

  
147
}
modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/deploy.info
1
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/cnr-mongo-mdstore/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "cnr-mongo-mdstore"}
modules/cnr-mongo-mdstore/tags/cnr-mongo-mdstore-6.0.3/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MDStoreDaoImpl.java
1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

  
3
import java.util.List;
4

  
5
import com.google.common.collect.Lists;
6
import com.google.common.primitives.Ints;
7
import com.mongodb.BasicDBObject;
8
import com.mongodb.BasicDBObjectBuilder;
9
import com.mongodb.DBObject;
10
import com.mongodb.client.AggregateIterable;
11
import com.mongodb.client.MongoCollection;
12
import com.mongodb.client.MongoDatabase;
13
import com.mongodb.client.model.CreateCollectionOptions;
14
import eu.dnetlib.data.mdstore.MDStoreServiceException;
15
import eu.dnetlib.data.mdstore.modular.MDStoreDescription;
16
import eu.dnetlib.data.mdstore.modular.RecordParser;
17
import eu.dnetlib.data.mdstore.modular.RecordParserFactory;
18
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
19
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDBStatus;
20
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
21
import eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager;
22
import eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreUtils;
23
import eu.dnetlib.miscutils.collections.FilteredCollection;
24
import eu.dnetlib.miscutils.collections.MappedCollection;
25
import eu.dnetlib.miscutils.datetime.DateUtils;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
28
import org.bson.Document;
29
import org.springframework.beans.factory.annotation.Autowired;
30
import org.springframework.beans.factory.annotation.Required;
31

  
32
/**
33
 * Factory bean for MongoMDStore instances.
34
 *
35
 * @author marko
36
 */
37
public class MDStoreDaoImpl implements MDStoreDao {
38

  
39
	public static final String MD_ID = "mdId";
40
	public static final String FORMAT = "format";
41
	public static final String INTERPRETATION = "interpretation";
42
	public static final String LAYOUT = "layout";
43
	public static final String SIZE = "size";
44
	public static final String METADATA_NAME = "metadata";
45
	private static final Log log = LogFactory.getLog(MDStoreDaoImpl.class);
46
	private MongoDatabase db;
47

  
48
	private RecordParserFactory recordParserFactory;
49

  
50
	private boolean discardRecords = true;
51

  
52
	@Autowired
53
	private MDStoreTransactionManager transactionManager;
54

  
55
	/**
56
	 * {@inheritDoc}
57
	 */
58
	@Override
59
	public void createMDStore(final String mdId, final String format, final String interpretation, final String layout) throws MDStoreServiceException {
60
		transactionManager.createMDStore(mdId);
61
		final String internalId = transactionManager.getMDStoreCollection(mdId);
62

  
63
		if (!Lists.newArrayList(getDb().listCollectionNames()).contains(internalId)) {
64
			log.info(String.format("creating collection %s", internalId));
65
			getDb().createCollection(internalId, new CreateCollectionOptions());
66
		}
67
		final MongoCollection<DBObject> coll = getDb().getCollection(METADATA_NAME, DBObject.class);
68
		final BasicDBObject obj = new BasicDBObject();
69
		obj.put(MD_ID, mdId);
70
		obj.put(FORMAT, format);
71
		obj.put(INTERPRETATION, interpretation);
72
		obj.put(LAYOUT, layout);
73
		obj.put(SIZE, 0);
74
		coll.insertOne(obj);
75
	}
76

  
77
	/**
78
	 * {@inheritDoc}
79
	 */
80
	@Override
81
	public void deleteMDStore(final String mdId) throws MDStoreServiceException {
82
		final MongoCollection<DBObject> metadata = getDb().getCollection(METADATA_NAME, DBObject.class);
83
		if (metadata == null) throw new MDStoreServiceException("cannot find metadata collection");
84
		transactionManager.dropMDStore(mdId);
85
		metadata.deleteOne(new BasicDBObject(MD_ID, mdId));
86
		log.info("deleted mdId: " + mdId);
87
	}
88

  
89
	/**
90
	 * {@inheritDoc}
91
	 */
92
	@Override
93
	public MDStore getMDStore(final String mdId) throws MDStoreServiceException {
94
		final String internalId = transactionManager.getMDStoreCollection(mdId);
95
		return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParser(), isDiscardRecords(), getDb());
96
	}
97

  
98
	/**
99
	 * {@inheritDoc}
100
	 */
101
	@Override
102
	public MDStore readMDStore(final String mdId) throws MDStoreServiceException {
103
		final String internalId = transactionManager.readMdStore(mdId);
104
		return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParser(), isDiscardRecords(), getDb());
105
	}
106

  
107
	/**
108
	 * {@inheritDoc}
109
	 */
110
	@Override
111
	public MDStore startTransaction(final String mdId, final boolean refresh) throws MDStoreServiceException {
112
		final String transactionId = transactionManager.startTransaction(mdId, refresh);
113
		return new MongoMDStore(transactionId, getDb().getCollection(transactionId, DBObject.class), getRecordParser(), isDiscardRecords(),
114
				getDb());
115
	}
116

  
117
	private RecordParser getRecordParser() {
118
		final RecordParser parser = getRecordParserFactory().newInstance();
119
		parser.setTimestamp(DateUtils.now());
120
		return parser;
121
	}
122

  
123
	/**
124
	 * {@inheritDoc}
125
	 */
126
	@Override
127
	public List<MDStoreDescription> listMDStores() {
128
		return MappedCollection.listMap(getDb().getCollection(METADATA_NAME, DBObject.class).find(), input -> {
129

  
130
			final String mdId = (String) input.get(MD_ID);
131
			log.debug("Getting info for " + mdId);
132
			final String format = (String) input.get(FORMAT);
133
			final String layout = (String) input.get(LAYOUT);
134
			final String interpretation = (String) input.get(INTERPRETATION);
135
			MongoMDStore currentMDStore = null;
136
			final MDStoreDescription description = new MDStoreDescription();
137
			try {
138
				currentMDStore = (MongoMDStore) getMDStore(mdId);
139
			} catch (final MDStoreServiceException e) {
140
				log.error("Error on retrieving mdstore for getting info mdId " + mdId);
141
			}
142

  
143
			int size = 0;
144
			if (input.containsField(SIZE)) {
145
				log.debug("Size retrieved from metadata for mdId :" + mdId);
146
				size = (Integer) input.get(SIZE);
147
			} else {
148
				if (currentMDStore != null) {
149
					log.debug("Size not Found in metadata for mdId :" + mdId + " calling getCount ");
150
					size = currentMDStore.getSize();
151
					input.put("size", size);
152
					getDb().getCollection(METADATA_NAME, DBObject.class).findOneAndReplace(new BasicDBObject(MD_ID, mdId), input);
153
				}
154
			}
155
			if (currentMDStore != null) {
156
				description.setIndexed(currentMDStore.isIndexed());
157
			}
158
			description.setId(mdId);
159
			description.setFormat(format);
160
			description.setLayout(layout);
161
			description.setInterpretation(interpretation);
162
			description.setSize(size);
163
			return description;
164
		});
165
	}
166

  
167
	/**
168
	 * {@inheritDoc}
169
	 */
170
	@Override
171
	public List<String> listMDStores(final String format, final String layout, final String interpretation) {
172
		return MappedCollection.listMap(
173
				FilteredCollection.listFilter(getDb().getCollection(METADATA_NAME, DBObject.class).find(), MDStoreUtils.dboFilter(format, layout, interpretation)),
174
				MDStoreUtils.mdId());
175
	}
176

  
177
	/**
178
	 * {@inheritDoc}
179
	 */
180
	@Override
181
	public int getCachedSize(final String id) throws MDStoreServiceException {
182
		log.debug("retrieve cached size for mdstore: " + id);
183
		final DBObject desc = getDb().getCollection(METADATA_NAME, DBObject.class).find(new BasicDBObject(MD_ID, id)).first();
184
		if (!desc.containsField(SIZE)) {
185
			desc.put(SIZE, getMDStore(id).getSize());
186
		}
187

  
188
		final Object oSize = desc.get(SIZE);
189
		return (Integer) oSize;
190
	}
191

  
192
	/**
193
	 * {@inheritDoc}
194
	 */
195
	@Override
196
	public void refreshSizes() throws MDStoreServiceException {
197
		for (final MDStoreDescription mdStoreId : listMDStores()) {
198
			refreshSize(mdStoreId.getId());
199
		}
200
	}
201

  
202
	/**
203
	 * {@inheritDoc}
204
	 */
205
	@Override
206
	public int refreshSize(final String mdStoreId) throws MDStoreServiceException {
207
		final int size = (int) getDb().getCollection(transactionManager.getMDStoreCollection(mdStoreId)).count();
208
		final MongoCollection<DBObject> metadata = getDb().getCollection(METADATA_NAME, DBObject.class);
209
		metadata.updateOne(new BasicDBObject(MD_ID, mdStoreId), new BasicDBObject("$set", new BasicDBObject(SIZE, size)));
210
		return size;
211
	}
212

  
213
	@Override
214
	public int getSumOfSizes(final String format, final String layout, final String interpretation) throws MDStoreServiceException {
215
		final MongoCollection<DBObject> metadata = getDb().getCollection(METADATA_NAME, DBObject.class);
216
		BasicDBObject matchObj = (BasicDBObject) BasicDBObjectBuilder.start("$match",
217
				BasicDBObjectBuilder.start("format", format).add("layout", layout).add("interpretation", interpretation).get()).get();
218
		BasicDBObject groupObj = (BasicDBObject) BasicDBObjectBuilder.start("$group",
219
				BasicDBObjectBuilder.start("_id", "").add("total", new BasicDBObject("$sum", "$" + SIZE)).get()).get();
220
		BasicDBObject projectObj = new BasicDBObject("$project", new BasicDBObject("_id", 0).append("total", 1));
221
		List<BasicDBObject> pipeline = Lists.newArrayList(matchObj, groupObj, projectObj);
222
		AggregateIterable<DBObject> output = metadata.aggregate(pipeline, DBObject.class);
223
		DBObject result = output.first();
224
		if (result == null || !result.containsField("total")) {
225
			log.debug("No total found");
226
			return 0;
227
		} else return (Integer) result.get("total");
228
	}
229

  
230
	/**
231
	 * {@inheritDoc}
232
	 */
233
	@Override
234
	public void commit(final String transactionId, final String mdId) throws MDStoreServiceException {
235
		transactionManager.commit(transactionId, mdId, getMDStore(mdId));
236
	}
237

  
238
	/**
239
	 * Getter for property 'db'.
240
	 *
241
	 * @return Value for property 'db'.
242
	 */
243
	public MongoDatabase getDb() {
244
		return db;
245
	}
246

  
247
	/**
248
	 * Setter for property 'db'.
249
	 *
250
	 * @param db
251
	 *            Value to set for property 'db'.
252
	 */
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff