Project

General

Profile

« Previous | Next » 

Revision 46349

migrated mdstore implementation, early tests

View differences:

modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/test/java/eu/dnetlib/msro/workers/aggregation/store/EmbeddedMongodbConfiguration.java
1
package eu.dnetlib.msro.workers.aggregation.store;
2

  
3
import java.util.logging.Level;
4
import java.util.logging.Logger;
5
import javax.annotation.PostConstruct;
6
import javax.annotation.PreDestroy;
7

  
8
import de.flapdoodle.embed.mongo.Command;
9
import de.flapdoodle.embed.mongo.MongodExecutable;
10
import de.flapdoodle.embed.mongo.MongodProcess;
11
import de.flapdoodle.embed.mongo.MongodStarter;
12
import de.flapdoodle.embed.mongo.config.*;
13
import de.flapdoodle.embed.mongo.distribution.Version;
14
import de.flapdoodle.embed.process.config.IRuntimeConfig;
15
import de.flapdoodle.embed.process.config.io.ProcessOutput;
16
import de.flapdoodle.embed.process.io.Processors;
17
import de.flapdoodle.embed.process.io.progress.LoggingProgressListener;
18
import de.flapdoodle.embed.process.runtime.Network;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.springframework.beans.factory.annotation.Value;
22
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
23
import org.springframework.context.annotation.ComponentScan;
24
import org.springframework.context.annotation.Configuration;
25
import org.springframework.context.annotation.Profile;
26
import org.springframework.stereotype.Component;
27

  
28
/**
29
 * Created by claudio on 17/03/2017.
30
 */
31
@Profile("mdstoreTest")
32
@Component
33
@Configuration
34
@EnableAutoConfiguration
35
@ComponentScan(basePackages = "eu.dnetlib.msro.workers.aggregation.store")
36
public class EmbeddedMongodbConfiguration {
37

  
38
	private static final Log log = LogFactory.getLog(EmbeddedMongodbConfiguration.class);
39

  
40
	/**
41
	 * please store Starter or RuntimeConfig in a static final field
42
	 * if you want to use artifact store caching (or else disable caching)
43
	 */
44
	private MongodStarter starter;
45

  
46
	private MongodExecutable mongodExe;
47

  
48
	private MongodProcess mongod;
49

  
50
	private IMongodConfig mongodConfig;
51

  
52
	@Value("${msro.worker.mdstore.mongodb.port}")
53
	private int port;
54

  
55
	@PostConstruct
56
	public void init() throws Exception {
57
		log.info("starting mongodb embedded instance");
58

  
59
		Logger logger = Logger.getLogger(getClass().getName());
60

  
61
		ProcessOutput processOutput = new ProcessOutput(
62
				Processors.logTo(logger, Level.INFO),
63
				Processors.logTo(logger, Level.SEVERE),
64
				Processors.named("[console>]", Processors.logTo(logger, Level.FINE)));
65

  
66
		IRuntimeConfig runtimeConfig = new RuntimeConfigBuilder()
67
				.defaultsWithLogger(Command.MongoD,logger)
68
				.processOutput(processOutput)
69
				.artifactStore(new ExtractedArtifactStoreBuilder()
70
						.defaults(Command.MongoD)
71
						.download(new DownloadConfigBuilder()
72
								.defaultsForCommand(Command.MongoD)
73
								.progressListener(new LoggingProgressListener(logger, Level.FINE))))
74
				.build();
75

  
76
		mongodConfig = new MongodConfigBuilder()
77
				.version(Version.Main.PRODUCTION)
78
				.net(new Net(port, Network.localhostIsIPv6()))
79
				.build();
80

  
81
		starter = MongodStarter.getInstance(runtimeConfig);
82

  
83
		mongodExe = starter.prepare(mongodConfig);
84
		mongod = mongodExe.start();
85
	}
86

  
87
	@PreDestroy
88
	public void tearDown() throws Exception {
89
		log.info("stopping mongodb embedded instance");
90
		mongod.stop();
91
		mongodExe.stop();
92
	}
93

  
94
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/test/java/eu/dnetlib/msro/workers/aggregation/store/MDStoreTest.java
1
package eu.dnetlib.msro.workers.aggregation.store;
2

  
3
import org.junit.Assert;
4
import org.junit.Test;
5
import org.junit.runner.RunWith;
6
import org.springframework.beans.factory.annotation.Autowired;
7
import org.springframework.boot.test.context.SpringBootTest;
8
import org.springframework.test.context.ActiveProfiles;
9
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
10

  
11
/**
12
 * Created by claudio on 10/03/2017.
13
 */
14
@ActiveProfiles("mdstoreTest")
15
@RunWith(SpringJUnit4ClassRunner.class)
16
@SpringBootTest(classes = EmbeddedMongodbConfiguration.class)
17
public class MDStoreTest {
18

  
19
	@Autowired
20
	private MDStoreDao dao;
21

  
22
	@Test
23
	public void testMDStoreCreate() throws MDStoreException {
24
		Assert.assertNotNull(dao);
25

  
26
		final String mdId = "id";
27
		dao.createMDStore(mdId, "format", "layout", "interpretation");
28
		final MDStore mdstore = dao.getMDStore(mdId);
29
		Assert.assertNotNull(mdstore);
30
	}
31

  
32
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/test/resources/application.properties
1
msro.worker.mdstore.mongodb.port=27018
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/WorkerConfiguration.java
1 1
package eu.dnetlib.msro;
2 2

  
3
import io.prometheus.client.exporter.MetricsServlet;
4
import io.prometheus.client.hotspot.DefaultExports;
5
import org.springframework.beans.factory.annotation.Value;
6
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
7
import org.springframework.boot.web.servlet.ServletRegistrationBean;
3 8
import org.springframework.context.annotation.Bean;
4 9
import org.springframework.context.annotation.Configuration;
5 10

  
......
13 18
	public Docket api() {
14 19
		return ApiDocUtils.newSwaggerDocket();
15 20
	}
21

  
22
	@Bean
23
	@ConditionalOnMissingBean(name = "prometheusMetricsServletRegistrationBean")
24
	ServletRegistrationBean prometheusMetricsServletRegistrationBean(@Value("${prometheus.metrics.path:/prometheus}") String metricsPath) {
25
		DefaultExports.initialize();
26
		return new ServletRegistrationBean(new MetricsServlet(), metricsPath);
27
	}
16 28
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/CollectJobNode.java
3 3
import java.io.StringReader;
4 4
import java.util.stream.Stream;
5 5

  
6
import org.dom4j.Document;
7
import org.dom4j.Node;
8
import org.dom4j.io.SAXReader;
9
import org.springframework.beans.factory.annotation.Autowired;
10
import org.springframework.context.annotation.Scope;
11
import org.springframework.stereotype.Component;
12

  
13 6
import eu.dnetlib.clients.is.InformationServiceClient;
14 7
import eu.dnetlib.msro.annotations.Direction;
15 8
import eu.dnetlib.msro.annotations.EnvParam;
......
17 10
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
18 11
import eu.dnetlib.msro.workflows.graph.Arc;
19 12
import eu.dnetlib.msro.workflows.nodes.AbstractProcessNode;
13
import org.dom4j.Document;
14
import org.dom4j.Node;
15
import org.dom4j.io.SAXReader;
16
import org.springframework.beans.factory.annotation.Autowired;
17
import org.springframework.context.annotation.Scope;
18
import org.springframework.stereotype.Component;
20 19

  
21 20
@Component
22 21
@Scope("prototype")
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/store/MDStoreFeederJobNode.java
1
package eu.dnetlib.msro.workers.aggregation.collect.store;
2

  
3
import org.springframework.context.annotation.Scope;
4
import org.springframework.stereotype.Component;
5

  
6
import eu.dnetlib.msro.annotations.ProcessNode;
7
import eu.dnetlib.msro.workflows.nodes.AbstractProcessNode;
8

  
9
@Component
10
@Scope("prototype")
11
@ProcessNode("MdStoreFeeder")
12
public class MDStoreFeederJobNode extends AbstractProcessNode {
13

  
14
	@Override
15
	protected String execute() throws Exception {
16
		// TODO Auto-generated method stub
17
		return null;
18
	}
19

  
20
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/store/MDStore.java
1
package eu.dnetlib.msro.workers.aggregation.collect.store;
2

  
3

  
4
public class MDStore {
5

  
6
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/store/MDStoreTransactionInfo.java
1
package eu.dnetlib.msro.workers.aggregation.store;
2

  
3
import java.util.Date;
4

  
5
/**
6
 * Created by claudio on 13/03/2017.
7
 */
8
public class MDStoreTransactionInfo {
9

  
10
	private String id;
11
	private Boolean refresh;
12
	private Date date;
13
	private long size;
14

  
15
	/**
16
	 * @return the date
17
	 */
18
	public Date getDate() {
19
		return date;
20
	}
21

  
22
	/**
23
	 * @param date the date to set
24
	 */
25
	public void setDate(final Date date) {
26
		this.date = date;
27
	}
28

  
29
	/**
30
	 * @return the id
31
	 */
32
	public String getId() {
33
		return id;
34
	}
35

  
36
	/**
37
	 * @param id the id to set
38
	 */
39
	public void setId(final String id) {
40
		this.id = id;
41
	}
42

  
43
	/**
44
	 * @return the refresh
45
	 */
46
	public Boolean getRefresh() {
47
		return refresh;
48
	}
49

  
50
	/**
51
	 * @param refresh the refresh to set
52
	 */
53
	public void setRefresh(final Boolean refresh) {
54
		this.refresh = refresh;
55
	}
56

  
57
	/**
58
	 * @return the size
59
	 */
60
	public long getSize() {
61
		return size;
62
	}
63

  
64
	/**
65
	 * @param size the size to set
66
	 */
67
	public void setSize(final long size) {
68
		this.size = size;
69
	}
70
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/store/mongo/MDStoreTransactionManagerImpl.java
1
package eu.dnetlib.msro.workers.aggregation.store.mongo;
2

  
3
import java.time.Duration;
4
import java.time.LocalDate;
5
import java.time.ZoneId;
6
import java.util.Collections;
7
import java.util.Date;
8
import java.util.List;
9
import java.util.stream.Collectors;
10

  
11
import com.google.common.collect.Lists;
12
import com.mongodb.*;
13
import com.mongodb.client.FindIterable;
14
import com.mongodb.client.MongoCollection;
15
import com.mongodb.client.MongoDatabase;
16
import com.mongodb.client.MongoIterable;
17
import com.mongodb.client.model.Filters;
18
import com.mongodb.client.model.UpdateOptions;
19
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
20
import eu.dnetlib.msro.workers.aggregation.store.*;
21
import org.apache.commons.lang3.StringUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.bson.conversions.Bson;
25
import org.springframework.beans.factory.annotation.Autowired;
26
import org.springframework.stereotype.Component;
27

  
28
/**
29
 * Created by claudio on 13/03/2017.
30
 */
31
@Component
32
public class MDStoreTransactionManagerImpl implements MDStoreTransactionManager {
33

  
34
	/**
35
	 * The Constant log.
36
	 */
37
	private static final Log log = LogFactory.getLog(MDStoreTransactionManagerImpl.class);
38

  
39
	/**
40
	 * The table name.
41
	 */
42
	private static String TABLE_NAME = "metadataManager";
43

  
44
	/**
45
	 * The max number of concurrent transactions per mdstore.
46
	 */
47
	private int maxTransactions = 1;
48

  
49
	/**
50
	 * The db.
51
	 */
52
	@Autowired
53
	private MongoDatabase db;
54

  
55
	/**
56
	 * The manager table.
57
	 */
58
	private MongoCollection<DBObject> managerTable;
59

  
60
	/**
61
	 * The expired days.
62
	 */
63
	private int expiredDays;
64

  
65
	/**
66
	 * Bootstrap manager.
67
	 */
68
	private void bootstrapManager() {
69
		log.debug("Bootstrap Manager start");
70
		final MongoCollection<DBObject> metadataColl = db.getCollection("metadata", DBObject.class);
71
		final FindIterable<DBObject> values = metadataColl.find();
72
		this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
73
		for (DBObject object : values) {
74
			final String id = (String) object.get("mdId");
75
			String newId = id;
76
			if (id.contains("_")) {
77
				newId = StringUtils.substringBefore(id, "_");
78
			}
79
			final DBObject input = BasicDBObjectBuilder.start()
80
				.add("mdId", id)
81
				.add("currentId", newId)
82
				.add("expiring", new String[] {})
83
				.add("transactions", new String[] {})
84
				.get();
85
			getManagerTable().insertOne(input);
86
			log.debug(String.format("Added %s to Metadata Manager data structure", id));
87

  
88
		}
89
		final BasicDBObject ensureIndex = new BasicDBObject();
90
		ensureIndex.put("mdId", 1);
91
		log.debug("Create index in MetadaManager ");
92
		this.getManagerTable().createIndex(ensureIndex);
93
	}
94

  
95
	/**
96
	 * Gets the DBObject describing an mdstore. null if there is no mdstore with the given id.
97
	 *
98
	 * @param mdstoreID
99
	 * @return DBObject or null
100
	 */
101
	private DBObject getMDStoreAsDBObject(String mdstoreID) {
102
		final BasicDBObject query = new BasicDBObject();
103
		query.put("mdId", mdstoreID);
104
		final FindIterable<DBObject> it = this.getManagerTable().find(query);
105
		DBObject mdstoreInfo = it.first();
106
		return mdstoreInfo;
107
	}
108

  
109
	/**
110
	 * {@inheritDoc}
111
	 */
112
	@Override
113
	public void verifyConsistency() throws MDStoreException {
114
		if (this.getManagerTable() == null) {
115
			final int size = DnetStreamSupport.generateStreamFromIterator(
116
					db.listCollectionNames().iterator())
117
					.filter(it -> it.contains(TABLE_NAME))
118
					.collect(Collectors.toList())
119
					.size();
120

  
121
			if (size == 0)
122
				bootstrapManager();
123
			else {
124
				this.setManagerTable(db.getCollection(TABLE_NAME, DBObject.class));
125
			}
126
		}
127
		if (this.getManagerTable() == null) throw new MDStoreException("Something bad happen, unable to create managerTable");
128
	}
129

  
130
	/**
131
	 * {@inheritDoc}
132
	 *
133
	 * @see MDStoreTransactionManager#createMDStore(String)
134
	 */
135
	@Override
136
	public void createMDStore(final String mdId) throws MDStoreException {
137
		log.debug("Creating new mdstore");
138
		verifyConsistency();
139
		String newId = mdId;
140
		if (mdId.contains("_")) {
141
			newId = StringUtils.substringBefore(mdId, "_");
142
		}
143
		final BasicDBObject instance = new BasicDBObject();
144
		instance.put("mdId", mdId);
145
		instance.put("currentId", newId);
146
		instance.put("expiring", new String[] {});
147
		getManagerTable().insertOne(instance);
148
	}
149

  
150
	/**
151
	 * {@inheritDoc}
152
	 *
153
	 * @see MDStoreTransactionManager#dropMDStore(String)
154
	 */
155
	@Override
156
	public void dropMDStore(final String mdId) throws MDStoreException {
157
		verifyConsistency();
158
		log.debug("Droping MDStore: " + mdId);
159
		final BasicDBObject query = new BasicDBObject();
160
		query.put("mdId", mdId);
161
		final DBObject dropped = this.getManagerTable().findOneAndDelete(query);
162
		garbage();
163
		final String collectionName = (String) dropped.get("currentId");
164
		this.db.getCollection(collectionName).drop();
165
		this.db.getCollection("discarded-" + collectionName).drop();
166
	}
167

  
168
	/**
169
	 * {@inheritDoc}
170
	 *
171
	 * @see MDStoreTransactionManager#getMDStoreCollection(String)
172
	 */
173
	@Override
174
	public String getMDStoreCollection(final String mdId) throws MDStoreException {
175
		verifyConsistency();
176
		DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
177
		if (mdstoreInfo != null)
178
			return (String) mdstoreInfo.get("currentId");
179
		else return null;
180
	}
181

  
182
	/**
183
	 * {@inheritDoc}
184
	 *
185
	 * @see MDStoreTransactionManager#startTransaction(String, boolean)
186
	 */
187
	@Override
188
	public String startTransaction(final String mdId, final boolean refresh) throws MDStoreException {
189
		verifyConsistency();
190
		log.info("Start transaction for metadata store " + mdId);
191
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
192
		if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdId);
193
		String idCreation = StringUtils.substringBefore(mdId, "_");
194
		idCreation = idCreation + "::" + System.currentTimeMillis();
195

  
196
		BasicDBList values;
197
		if (mdstoreInfo.containsField("transactions")) {
198
			values = (BasicDBList) mdstoreInfo.get("transactions");
199
			if (values.size() > getMaxTransactions())
200
				throw new MDStoreException(String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), values.size(), mdId));
201
		} else {
202
			values = new BasicDBList();
203
		}
204
		final BasicDBObject transactionMetadata = new BasicDBObject();
205
		transactionMetadata.put("id", idCreation.toString());
206
		transactionMetadata.put("refresh", refresh);
207
		transactionMetadata.put("date", new Date());
208
		values.add(transactionMetadata);
209
		mdstoreInfo.put("transactions", values);
210
		getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
211
		return idCreation.toString();
212
	}
213

  
214
	/**
215
	 * {@inheritDoc}
216
	 *
217
	 * @see MDStoreTransactionManager#commit(String, String,
218
	 * MDStore)
219
	 */
220
	@Override
221
	public boolean commit(final String transactionId, final String mdstoreId, final MDStore current) throws MDStoreException {
222
		verifyConsistency();
223
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdstoreId);
224
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdstoreId);
225
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
226
		final DBObject transaction = findTransaction(transactions, transactionId);
227
		if (transaction == null) throw new  MDStoreException("Error, unable to find transaction with Id " + transactionId);
228
		final boolean refresh = (Boolean) transaction.get("refresh");
229
		transactions.remove(transaction);
230
		final String oldId = (String) mdstoreInfo.get("currentId");
231
		if (refresh) {
232
			mdstoreInfo.put("currentId", transactionId);
233
			final BasicDBList stillUsed = (BasicDBList) mdstoreInfo.get("expiring");
234
			if (stillUsed.size() == 0) {
235
				db.getCollection(oldId).drop();
236
				db.getCollection("discarded-" + oldId).drop();
237
			}
238
			log.debug("Replaced collection ");
239
		} else {
240
			log.debug("commit incremental ");
241
			updateIncremental(transactionId, oldId);
242
			db.getCollection(transactionId).drop();
243
			db.getCollection("discarded-" + transactionId).drop();
244
		}
245
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
246

  
247
		log.info("Committed transaction for metadata store " + mdstoreId);
248
		return true;
249
	}
250

  
251
	/**
252
	 * Find transaction.
253
	 *
254
	 * @param transactions  the transactions
255
	 * @param transactionId the transaction id
256
	 * @return the DB object
257
	 */
258
	private DBObject findTransaction(final BasicDBList transactions, final String transactionId) {
259
		if (transactions.size() == 0) return null;
260
		for (int i = 0; i < transactions.size(); i++) {
261
			final BasicDBObject transaction = (BasicDBObject) transactions.get(i);
262
			final String id = (String) transaction.get("id");
263
			if (transactionId.equals(id)) return transaction;
264
		}
265
		return null;
266
	}
267

  
268
	/**
269
	 * {@inheritDoc}
270
	 *
271
	 * @see MDStoreTransactionManager#readMdStore(String)
272
	 */
273
	@Override
274
	public String readMdStore(final String mdStoreId) throws  MDStoreException {
275
		verifyConsistency();
276
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
277
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdStoreId);
278
		final String currentId = (String) mdstoreInfo.get("currentId");
279
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
280
		updateMdstoreUsed(values, currentId);
281
		this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
282
		return currentId;
283
	}
284

  
285
	/**
286
	 * Update mdstore used.
287
	 *
288
	 * @param values the values
289
	 * @param mdId   the md id
290
	 */
291
	private void updateMdstoreUsed(final BasicDBList values, final String mdId) {
292
		if (values.size() > 0) {
293
			for (int i = 0; i < values.size(); i++) {
294
				final DBObject obj = (DBObject) values.get(i);
295
				final String id = (String) obj.get("id");
296
				if (mdId.equals(id)) {
297
					obj.put("lastRead", new Date());
298
					return;
299
				}
300
			}
301
		}
302
		final BasicDBObject readStore = new BasicDBObject();
303
		readStore.put("id", mdId);
304
		readStore.put("lastRead", new Date());
305
		values.add(readStore);
306
	}
307

  
308
	/**
309
	 * Gets the manager table.
310
	 *
311
	 * @return the managerTable
312
	 */
313
	public MongoCollection<DBObject> getManagerTable() {
314
		return managerTable;
315
	}
316

  
317
	/**
318
	 * Sets the manager table.
319
	 *
320
	 * @param managerTable the managerTable to set
321
	 */
322
	public void setManagerTable(final MongoCollection<DBObject> managerTable) {
323
		this.managerTable = managerTable;
324
	}
325

  
326
	/*
327
	 * (non-Javadoc)
328
	 *
329
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
330
	 */
331
	@Override
332
	public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdStoreId) throws  MDStoreException {
333
		verifyConsistency();
334
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
335
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdStoreId);
336
		final MDStoreManagerInfo result = new MDStoreManagerInfo();
337
		result.setCurrentId((String) mdstoreInfo.get("currentId"));
338
		result.setMdId((String) mdstoreInfo.get("mdId"));
339
		final BasicDBList values = (BasicDBList) mdstoreInfo.get("expiring");
340
		for (int i = 0; i < values.size(); i++) {
341
			final MDStoreExpiredInfo stillused = new MDStoreExpiredInfo();
342
			final DBObject value = (DBObject) values.get(i);
343
			stillused.setId((String) value.get("id"));
344
			stillused.setLastRead((Date) value.get("lastRead"));
345
			result.addExpiredItem(stillused);
346
		}
347
		final BasicDBList transactions = (BasicDBList) mdstoreInfo.get("transactions");
348
		if (transactions != null) {
349
			for (int i = 0; i < transactions.size(); i++) {
350
				final MDStoreTransactionInfo transaction = new MDStoreTransactionInfo();
351
				final DBObject value = (DBObject) transactions.get(i);
352
				final String transactionId = (String) value.get("id");
353
				transaction.setId(transactionId);
354
				transaction.setDate((Date) value.get("date"));
355
				transaction.setRefresh((Boolean) value.get("refresh"));
356
				transaction.setSize(db.getCollection(transactionId).count());
357
				result.addTransactionInfo(transaction);
358
			}
359
		}
360
		return result;
361
	}
362

  
363
	/*
364
	 * (non-Javadoc)
365
	 *
366
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
367
	 */
368
	@Override
369
	public Boolean dropUsed(final String mdId, final String idToDrop) throws  MDStoreException {
370
		verifyConsistency();
371
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
372
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdId);
373
		return dropStore(mdstoreInfo, idToDrop, "expiring");
374
	}
375

  
376
	private boolean dropStore(DBObject mdstoreInfo, final String idToDrop, String transactionListName) throws  MDStoreException {
377
		final BasicDBList transactionList = (BasicDBList) mdstoreInfo.get(transactionListName);
378
		for (int i = 0; i < transactionList.size(); i++) {
379
			final DBObject value = (DBObject) transactionList.get(i);
380
			final String currentUsedId = (String) value.get("id");
381
			if (currentUsedId.equals(idToDrop)) {
382
				db.getCollection(idToDrop).drop();
383
				db.getCollection("discarded-" + idToDrop).drop();
384
				transactionList.remove(value);
385
				mdstoreInfo.put(transactionListName, transactionList);
386
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", mdstoreInfo.get("_id")), mdstoreInfo);
387
				return true;
388
			}
389
		}
390
		throw new  MDStoreException("Error, unable to drop collection " + idToDrop);
391
	}
392

  
393
	/*
394
	 * (non-Javadoc)
395
	 *
396
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#garbage()
397
	 */
398
	@Override
399
	public void garbage() throws  MDStoreException {
400
		verifyConsistency();
401
		log.info("Start garbage collection of MdStore");
402
		final FindIterable<DBObject> it = this.managerTable.find();
403
		int totalDeleted = 0;
404
		for (DBObject currentObject : it) {
405
			if (log.isDebugEnabled()) {
406
				log.debug("start to check id: " + currentObject.get("currentId"));
407
			}
408
			garbageExpiring(currentObject, (String) currentObject.get("currentId"));
409
			garbageTransactions(currentObject, (String) currentObject.get("currentId"));
410
			this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
411
		}
412

  
413
		// DELETING Collection that are not in the metadataManager table
414
		MongoIterable<String> collections = this.db.listCollectionNames();
415
		for (String collection : collections) {
416
			if ((collection.length() > 30) && (collection.contains("discarded-") == false) && (collection.contains("resolved") == false)) {
417
				final DBObject item = getMetadataObjectForCollections(collection);
418

  
419
				if (shouldDelete(collection, item)) {
420
					if (log.isDebugEnabled()) {
421
						log.debug("delete collection: " + collection + " from mongo");
422
					}
423
					db.getCollection(collection).drop();
424
					db.getCollection("discarded-" + collection).drop();
425
					if (log.isDebugEnabled()) {
426
						log.debug("delete collection: discarded-" + collection + " from mongo");
427
					}
428
				}
429
			}
430
		}
431

  
432
		log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted);
433
	}
434

  
435
	private DBObject getMetadataObjectForCollections(final String collectionName) {
436
		if (collectionName == null) return null;
437
		final String postfix = "_TURTdG9yZURTUmVzb3VyY2VzL01EU3RvcmVEU1Jlc291cmNlVHlwZQ==";
438
		final String tmp = collectionName.contains("discarded-") == true ? StringUtils.substringAfter(collectionName, "discarded-") : collectionName;
439
		final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + postfix;
440

  
441
		//DBObject query = QueryBuilder.start("mdId").is(collectionNameCleaned).get();
442
		Bson query = Filters.eq("mdId", collectionNameCleaned);
443
		return this.managerTable.find(query).first();
444

  
445
	}
446

  
447
	private boolean shouldDelete(final String collectionName, final DBObject metadataManagerInstance) {
448
		log.debug("should delete instance " + metadataManagerInstance);
449
		if ((metadataManagerInstance == null) || (metadataManagerInstance.get("currentId") == null)) {
450
			log.debug("the instance has not currentID");
451
			return true;
452
		}
453
		String currentId = (String) metadataManagerInstance.get("currentId");
454
		if (collectionName.equals(currentId)) return false;
455
		BasicDBList expiringList = (BasicDBList) metadataManagerInstance.get("expiring");
456
		if (findInList(expiringList, collectionName, "id") == true) return false;
457
		BasicDBList transactionsList = (BasicDBList) metadataManagerInstance.get("transactions");
458
		return findInList(transactionsList, collectionName, "id") != true;
459
	}
460

  
461
	private boolean findInList(final BasicDBList list, final String object, final String tagname) {
462
		if (list == null) return false;
463
		for (int i = 0; i < list.size(); i++) {
464
			DBObject currentObject = (DBObject) list.get(i);
465
			final String id = (String) currentObject.get(tagname);
466
			if (id.equals(object)) return true;
467
		}
468
		return false;
469
	}
470

  
471
	/**
472
	 * Delete.
473
	 *
474
	 * @param list     the list
475
	 * @param toRemove the to remove
476
	 */
477
	private void delete(final BasicDBList list, final List<DBObject> toRemove) {
478
		for (final DBObject obj : toRemove) {
479
			if (log.isDebugEnabled()) {
480
				log.debug("deleting " + obj);
481
			}
482
			list.remove(obj);
483
		}
484
	}
485

  
486
	/**
487
	 * Garbage transactions.
488
	 *
489
	 * @param currentObject the current object
490
	 * @param currentId     the current id
491
	 */
492
	private void garbageTransactions(final DBObject currentObject, final String currentId) {
493
		if (log.isDebugEnabled()) {
494
			log.debug("Start garbage transactions ");
495
		}
496

  
497
		final BasicDBList expiring = (BasicDBList) currentObject.get("transactions");
498
		if ((expiring == null) || (expiring.size() <= getMaxTransactions())) return;
499

  
500
		List<DBObject> expiringList = Lists.newArrayList();
501

  
502
		for (int i = 0; i < expiring.size(); i++) {
503
			final DBObject cobj = (DBObject) expiring.get(i);
504
			if (cobj != null) {
505
				expiringList.add((DBObject) expiring.get(i));
506
			}
507

  
508
		}
509

  
510
		Collections.sort(expiringList, MDStoreUtils.getComparatorOnDate());
511

  
512
		List<DBObject> toRemove = Lists.newArrayList();
513
		int i = 0;
514

  
515
		// We should remove the k item less recent
516
		// where k = numberOftotalTransaction - maxNumberOfTransaction
517
		// k = numberOfItemToRemove
518

  
519
		while (((expiringList.size() - toRemove.size()) > getMaxTransactions()) || (i < expiringList.size())) {
520
			DBObject currentObj = expiringList.get(i++);
521
			String objectId = (String) currentObj.get("id");
522
			if (!objectId.equals(currentId)) {
523
				if (log.isDebugEnabled()) {
524
					log.debug("delete collection: " + objectId + " from mongo");
525
				}
526
				db.getCollection(objectId).drop();
527
				db.getCollection("discarded-" + objectId).drop();
528
				if (log.isDebugEnabled()) {
529
					log.debug("delete collection: discarded-" + objectId + " from mongo");
530
				}
531
				toRemove.add(currentObj);
532
			} else {
533
				if (log.isDebugEnabled()) {
534
					log.debug("Cannot remove transaction " + objectId + " because is the currentId: " + currentId);
535
				}
536
			}
537
		}
538

  
539
		delete(expiring, toRemove);
540
		log.info("Deleted " + toRemove.size() + " transactions, mdStore Id:" + currentObject.get("mdId"));
541
	}
542

  
543
	/**
544
	 * Garbage expiring.
545
	 *
546
	 * @param currentObject the current object
547
	 * @param currentId     the current id
548
	 */
549
	private void garbageExpiring(final DBObject currentObject, final String currentId) {
550
		if (log.isDebugEnabled()) {
551
			log.debug("Start to search expiring mdstores for id: " + currentObject.get("mdId"));
552
		}
553
		final BasicDBList expiring = (BasicDBList) currentObject.get("expiring");
554
		final List<DBObject> toRemove = Lists.newArrayList();
555
		if (log.isDebugEnabled()) {
556
			if (expiring == null) {
557
				log.debug("expiring list is null");
558
			} else {
559
				log.debug("expiring list size is :" + expiring.size());
560
			}
561
		}
562
		if ((expiring == null) || (expiring.size() == 0)) {
563
			log.debug("Deleted  0  expired  collections, mdStore Id:" + currentObject.get("mdId"));
564
			return;
565
		}
566
		for (int i = 0; i < expiring.size(); i++) {
567
			final DBObject currentExpiredStore = (DBObject) expiring.get(i);
568
			final String currentUsedId = (String) currentExpiredStore.get("id");
569
			final long d = getExpiringDays(currentExpiredStore, "lastRead");
570
			if (log.isDebugEnabled()) {
571
				log.debug("the store :" + currentId + " expired since " + d + "days ");
572
			}
573
			// DELETE the collection where the last time they was read
574
			// is more than 3 days ago
575
			if (d > getExpiredDays()) {
576
				if (currentUsedId.equals(currentId) == false) {
577
					db.getCollection(currentUsedId).drop();
578
					db.getCollection("discarded-" + currentUsedId).drop();
579
					log.debug("deleted collection " + currentUsedId);
580
				}
581
				toRemove.add(currentExpiredStore);
582
			}
583
		}
584
		delete(expiring, toRemove);
585
		log.debug("Deleted expired " + toRemove.size() + "collections, mdStore Id:" + currentObject.get("mdId"));
586
	}
587

  
588
	/**
589
	 * Gets the expiring days.
590
	 *
591
	 * @param value     the value
592
	 * @param paramName the param name
593
	 * @return the expiring days
594
	 */
595
	private long getExpiringDays(final DBObject value, final String paramName) {
596

  
597
		final Date lastRead = (Date) value.get(paramName);
598

  
599
		final LocalDate readDate = lastRead.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
600
		return Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays();
601
	}
602

  
603
	/**
604
	 * Gets the expired days.
605
	 *
606
	 * @return the expiredDays
607
	 */
608
	public int getExpiredDays() {
609
		if (this.expiredDays == 0) return 3;
610
		return expiredDays;
611
	}
612

  
613
	/**
614
	 * Sets the expired days.
615
	 *
616
	 * @param expiredDays the expiredDays to set
617
	 */
618
	public void setExpiredDays(final int expiredDays) {
619
		this.expiredDays = expiredDays;
620
	}
621

  
622
	/**
623
	 * Update incremental.
624
	 *
625
	 * @param transactionId the transaction id
626
	 * @param currentId     the current id
627
	 */
628
	private void updateIncremental(final String transactionId, final String currentId) {
629
		final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class);
630
		final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class);
631
		final FindIterable<DBObject> it = transaction.find();
632
		for (DBObject currentObj : it) {
633

  
634
			BasicDBObject newObj = new BasicDBObject();
635

  
636
			final String id = (String) currentObj.get("id");
637
			final String body = (String) currentObj.get("body");
638
			newObj.put("id", id);
639
			newObj.put("body", body);
640
			if (StringUtils.isNotBlank(id)) {
641
				//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
642
				//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
643
				final MongoCollection<DBObject> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
644
				mdstoreWrite.replaceOne(new BasicDBObject("id", id), newObj, new UpdateOptions().upsert(true));
645
			}
646
		}
647
	}
648

  
649
	/*
650
	 * (non-Javadoc)
651
	 *
652
	 * @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
653
	 */
654
	@Override
655
	public Boolean dropTransaction(final String mdId, final String idToDrop) throws  MDStoreException {
656
		verifyConsistency();
657
		final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId);
658
		if (mdstoreInfo == null) throw new  MDStoreException("Error, unable to find Mdstore with Id " + mdId);
659
		return dropStore(mdstoreInfo, idToDrop, "transactions");
660
	}
661

  
662
	public void garbageTransactionsOnStart() throws  MDStoreException {
663
		verifyConsistency();
664
		FindIterable<DBObject> it = this.managerTable.find();
665
		for (DBObject currentObject : it) {
666
			final BasicDBList transactions = (BasicDBList) currentObject.get("transactions");
667
			if ((transactions != null) && (transactions.size() > 0)) {
668
				for (int i = 0; i < transactions.size(); i++) {
669
					final DBObject currentTransactions = (DBObject) transactions.get(i);
670
					final String id = (String) currentTransactions.get("id");
671
					db.getCollection(id).drop();
672
					db.getCollection("discarded-" + id).drop();
673
					log.debug("deleted collection " + id);
674
				}
675
				currentObject.put("transactions", new BasicDBList());
676
				this.getManagerTable().findOneAndReplace(new BasicDBObject("_id", currentObject.get("_id")), currentObject);
677
			}
678
		}
679
	}
680

  
681
	/**
682
	 * Gets the max transactions.
683
	 *
684
	 * @return the maxTransactions
685
	 */
686
	public int getMaxTransactions() {
687
		return maxTransactions;
688
	}
689

  
690
	/**
691
	 * Sets the max transactions.
692
	 *
693
	 * @param maxTransactions the maxTransactions to set
694
	 */
695
	public void setMaxTransactions(final int maxTransactions) {
696
		this.maxTransactions = maxTransactions;
697
	}
698

  
699
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/store/mongo/MongoDBProperties.java
1
package eu.dnetlib.msro.workers.aggregation.store.mongo;
2

  
3
import org.springframework.boot.context.properties.ConfigurationProperties;
4
import org.springframework.stereotype.Component;
5

  
6
/**
7
 * Created by claudio on 13/03/2017.
8
 */
9
@Component
10
@ConfigurationProperties(prefix = "msro.worker.mdstore.mongodb")
11
public class MongoDBProperties {
12

  
13
	private String host = "localhost";
14

  
15
	private int port = 27017;
16

  
17
	private String dbname = "mdstore";
18

  
19
	public String getHost() {
20
		return host;
21
	}
22

  
23
	public void setHost(final String host) {
24
		this.host = host;
25
	}
26

  
27
	public int getPort() {
28
		return port;
29
	}
30

  
31
	public void setPort(final int port) {
32
		this.port = port;
33
	}
34

  
35
	public String getDbname() {
36
		return dbname;
37
	}
38

  
39
	public void setDbname(final String dbname) {
40
		this.dbname = dbname;
41
	}
42
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/store/mongo/MDStoreDaoImpl.java
1
package eu.dnetlib.msro.workers.aggregation.store.mongo;
2

  
3
import java.util.List;
4
import java.util.stream.Collectors;
5

  
6
import com.google.common.collect.Lists;
7
import com.google.common.primitives.Ints;
8
import com.mongodb.BasicDBObject;
9
import com.mongodb.BasicDBObjectBuilder;
10
import com.mongodb.DBObject;
11
import com.mongodb.client.AggregateIterable;
12
import com.mongodb.client.MongoCollection;
13
import com.mongodb.client.MongoDatabase;
14
import com.mongodb.client.model.CreateCollectionOptions;
15
import eu.dnetlib.miscutils.datetime.DateUtils;
16
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
17
import eu.dnetlib.msro.workers.aggregation.store.*;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.bson.Document;
21
import org.springframework.beans.factory.annotation.Autowired;
22
import org.springframework.stereotype.Component;
23

  
24
/**
25
 * Created by claudio on 13/03/2017.
26
 */
27
@Component
28
public class MDStoreDaoImpl implements MDStoreDao {
29

  
30
	private static final Log log = LogFactory.getLog(MDStoreDaoImpl.class);
31

  
32
	public static final String MD_ID = "mdId";
33
	public static final String FORMAT = "format";
34
	public static final String INTERPRETATION = "interpretation";
35
	public static final String LAYOUT = "layout";
36
	public static final String SIZE = "size";
37
	public static final String METADATA_NAME = "metadata";
38

  
39
	private boolean discardRecords = true;
40

  
41
	@Autowired
42
	private MongoDatabase db;
43

  
44
	@Autowired
45
	private MDStoreTransactionManager transactionManager;
46

  
47
	/**
48
	 * {@inheritDoc}
49
	 */
50
	@Override
51
	public void createMDStore(final String mdId, final String format, final String interpretation, final String layout) throws MDStoreException {
52
		
53
		transactionManager.createMDStore(mdId);
54
		final String internalId = transactionManager.getMDStoreCollection(mdId);
55

  
56
		final List<String> names = DnetStreamSupport.generateStreamFromIterator(db.listCollectionNames().iterator())
57
				.filter(c -> c.contains(internalId))
58
				.collect(Collectors.toList());
59

  
60
		if (names.isEmpty()) {
61
			log.info(String.format("creating collection %s", internalId));
62
			db.createCollection(internalId, new CreateCollectionOptions());
63
		}
64

  
65
		final BasicDBObject obj = new BasicDBObject();
66
		obj.put(MD_ID, mdId);
67
		obj.put(FORMAT, format);
68
		obj.put(INTERPRETATION, interpretation);
69
		obj.put(LAYOUT, layout);
70
		obj.put(SIZE, 0);
71
		getMetadataCollection().insertOne(obj);
72
	}
73

  
74
	/**
75
	 * {@inheritDoc}
76
	 */
77
	@Override
78
	public void deleteMDStore(final String mdId) throws MDStoreException {
79
		final MongoCollection<DBObject> metadata = getMetadataCollection();
80
		if (metadata == null) throw new MDStoreException("cannot find metadata collection");
81
		transactionManager.dropMDStore(mdId);
82
		metadata.deleteOne(new BasicDBObject(MD_ID, mdId));
83
		log.info("deleted mdId: " + mdId);
84
	}
85

  
86
	/**
87
	 * {@inheritDoc}
88
	 */
89
	@Override
90
	public MDStore getMDStore(final String mdId) throws MDStoreException {
91
		final String internalId = transactionManager.getMDStoreCollection(mdId);
92
		return new MongoMDStore(mdId, getRecordCollection(internalId), isDiscardRecords(), db);
93
	}
94

  
95
	/**
96
	 * {@inheritDoc}
97
	 */
98
	@Override
99
	public MDStore readMDStore(final String mdId) throws MDStoreException {
100
		final String internalId = transactionManager.readMdStore(mdId);
101
		return new MongoMDStore(mdId, getRecordCollection(internalId), isDiscardRecords(), db);
102
	}
103

  
104
	/**
105
	 * {@inheritDoc}
106
	 */
107
	@Override
108
	public MDStore startTransaction(final String mdId, final boolean refresh) throws MDStoreException {
109
		final String transactionId = transactionManager.startTransaction(mdId, refresh);
110
		return new MongoMDStore(transactionId, getRecordCollection(transactionId), isDiscardRecords(), db);
111
	}
112

  
113
	/**
114
	 * {@inheritDoc}
115
	 */
116
	@Override
117
	public List<MDStoreDescription> listMDStores() {
118
		return DnetStreamSupport.generateStreamFromIterator(getMetadataCollection().find().iterator())
119
				.map(this::convertMDStoreDescription)
120
				.collect(Collectors.toList());
121
	}
122

  
123
	private MongoCollection<Record> getRecordCollection(final String collectionId) {
124
		return db.getCollection(collectionId, Record.class);
125
	}
126

  
127
	private MongoCollection<DBObject> getMetadataCollection() {
128
		return db.getCollection(METADATA_NAME, DBObject.class);
129
	}
130

  
131
	private MDStoreDescription convertMDStoreDescription(DBObject input) {
132

  
133
		final String mdId = (String) input.get(MD_ID);
134
		log.debug("Getting info for " + mdId);
135
		final String format = (String) input.get(FORMAT);
136
		final String layout = (String) input.get(LAYOUT);
137
		final String interpretation = (String) input.get(INTERPRETATION);
138
		MongoMDStore currentMDStore = null;
139
		final MDStoreDescription description = new MDStoreDescription();
140
		try {
141
			currentMDStore = (MongoMDStore) getMDStore(mdId);
142
		} catch (final MDStoreException e) {
143
			log.error("Error on retrieving mdstore for getting info mdId " + mdId);
144
		}
145

  
146
		int size = 0;
147
		if (input.containsField(SIZE)) {
148
			log.debug("Size retrieved from metadata for mdId :" + mdId);
149
			size = (Integer) input.get(SIZE);
150
		} else {
151
			if (currentMDStore != null) {
152
				log.debug("Size not Found in metadata for mdId :" + mdId + " calling getCount ");
153
				size = currentMDStore.getSize();
154
				input.put("size", size);
155
				db.getCollection(METADATA_NAME, DBObject.class).findOneAndReplace(new BasicDBObject(MD_ID, mdId), input);
156
			}
157
		}
158
		if (currentMDStore != null) {
159
			description.setIndexed(currentMDStore.isIndexed());
160
		}
161
		description.setId(mdId);
162
		description.setFormat(format);
163
		description.setLayout(layout);
164
		description.setInterpretation(interpretation);
165
		description.setSize(size);
166
		return description;
167

  
168
	}
169

  
170
	/**
171
	 * {@inheritDoc}
172
	 */
173
	@Override
174
	public List<String> listMDStores(final String format, final String layout, final String interpretation) {
175
		return DnetStreamSupport.generateStreamFromIterator(getMetadataCollection().find().iterator())
176
				.filter(MDStoreUtils.dboFilter(format, layout, interpretation))
177
				.map(MDStoreUtils.mdId())
178
				.collect(Collectors.toList());
179
	}
180

  
181
	/**
182
	 * {@inheritDoc}
183
	 */
184
	@Override
185
	public int getCachedSize(final String id) throws MDStoreException {
186
		log.debug("retrieve cached size for mdstore: " + id);
187
		final DBObject desc = getMetadataCollection().find(new BasicDBObject(MD_ID, id)).first();
188
		if (!desc.containsField(SIZE)) {
189
			desc.put(SIZE, getMDStore(id).getSize());
190
		}
191

  
192
		final Object oSize = desc.get(SIZE);
193
		return (Integer) oSize;
194
	}
195

  
196
	/**
197
	 * {@inheritDoc}
198
	 */
199
	@Override
200
	public void refreshSizes() throws MDStoreException {
201
		for (final MDStoreDescription mdStoreId : listMDStores()) {
202
			refreshSize(mdStoreId.getId());
203
		}
204
	}
205

  
206
	/**
207
	 * {@inheritDoc}
208
	 */
209
	@Override
210
	public int refreshSize(final String mdStoreId) throws MDStoreException {
211
		final int size = (int) db.getCollection(transactionManager.getMDStoreCollection(mdStoreId)).count();
212
		final MongoCollection<DBObject> metadata = db.getCollection(METADATA_NAME, DBObject.class);
213
		metadata.updateOne(new BasicDBObject(MD_ID, mdStoreId), new BasicDBObject("$set", new BasicDBObject(SIZE, size)));
214
		return size;
215
	}
216

  
217
	@Override
218
	public int getSumOfSizes(final String format, final String layout, final String interpretation) throws MDStoreException {
219
		final MongoCollection<DBObject> metadata = getMetadataCollection();
220
		BasicDBObject matchObj = (BasicDBObject) BasicDBObjectBuilder.start("$match",
221
				BasicDBObjectBuilder.start("format", format).add("layout", layout).add("interpretation", interpretation).get()).get();
222
		BasicDBObject groupObj = (BasicDBObject) BasicDBObjectBuilder.start("$group",
223
				BasicDBObjectBuilder.start("_id", "").add("total", new BasicDBObject("$sum", "$" + SIZE)).get()).get();
224
		BasicDBObject projectObj = new BasicDBObject("$project", new BasicDBObject("_id", 0).append("total", 1));
225
		List<BasicDBObject> pipeline = Lists.newArrayList(matchObj, groupObj, projectObj);
226
		AggregateIterable<DBObject> output = metadata.aggregate(pipeline, DBObject.class);
227
		DBObject result = output.first();
228
		if (result == null || !result.containsField("total")) {
229
			log.debug("No total found");
230
			return 0;
231
		} else return (Integer) result.get("total");
232
	}
233

  
234
	/**
235
	 * {@inheritDoc}
236
	 */
237
	@Override
238
	public void commit(final String transactionId, final String mdId) throws MDStoreException {
239
		transactionManager.commit(transactionId, mdId, getMDStore(mdId));
240
	}
241
	/**
242
	 * Getter for property 'discardRecords'.
243
	 *
244
	 * @return Value for property 'discardRecords'.
245
	 */
246
	public boolean isDiscardRecords() {
247
		return discardRecords;
248
	}
249

  
250
	/**
251
	 * Setter for property 'discardRecords'.
252
	 *
253
	 * @param discardRecords Value to set for property 'discardRecords'.
254
	 */
255
	public void setDiscardRecords(final boolean discardRecords) {
256
		this.discardRecords = discardRecords;
257
	}
258

  
259
	@Override
260
	public MDStoreDBStatus getDBStatus() {
261
		final int handledDatastructures = Ints.saturatedCast(db.getCollection(METADATA_NAME).count());
262
		//final int usedDiskSpace = Ints.saturatedCast(db.getStats().getLong("storageSize") / (1024 * 1024)); // in MB
263
		//{ dbStats: 1, scale: 1 }
264
		BasicDBObject statsQuery = new BasicDBObject("dbStats", 1);
265
		statsQuery.put("scale", 1024 * 1024); //storageSize in MB
266
		final Document statsRes = db.runCommand(statsQuery);
267
		log.debug("DBStatus --  " + statsRes.toJson());
268
		int usedDiskSpace = 0;
269
		//trying to handle different versions of the mongo server: old version returns storage size as long, new version as double
270
		//TODO: simplify this when dev, beta, production are aligned with our local, latest, mongo version
271
		String usedDiskSpaceStr = statsRes.get("storageSize").toString();
272
		try {
273
			Long usedDiskSpaceLong = Long.parseLong(usedDiskSpaceStr);
274
			usedDiskSpace = Ints.saturatedCast(usedDiskSpaceLong);
275
		} catch (NumberFormatException nfe) {
276
			Double usedDiskSpaceDbl = Double.parseDouble(usedDiskSpaceStr);
277
			usedDiskSpace = usedDiskSpaceDbl.intValue();
278
		}
279
		final String date = DateUtils.now_ISO8601();
280
		return new MDStoreDBStatus(handledDatastructures, usedDiskSpace, date);
281
	}
282

  
283
	@Override
284
	public void startGarbage() throws MDStoreException {
285
		this.transactionManager.garbage();
286
	}
287

  
288
	@Override
289
	public void invalidTransaction(final String transactionId, final String mdId) throws MDStoreException {
290
		transactionManager.dropTransaction(mdId, transactionId);
291
	}
292

  
293
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/store/mongo/MongoBulkWritesManager.java
1
package eu.dnetlib.msro.workers.aggregation.store.mongo;
2

  
3
import java.util.List;
4

  
5
import com.google.common.collect.Lists;
6
import com.mongodb.BasicDBObject;
7
import com.mongodb.WriteConcern;
8
import com.mongodb.client.MongoCollection;
9
import com.mongodb.client.model.*;
10
import eu.dnetlib.msro.workers.aggregation.store.Record;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14

  
15
public class MongoBulkWritesManager {
16

  
17
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
18
	private final boolean discardRecords;
19
	private MongoCollection<Record> validCollection;
20
	private List<WriteModel<Record>> validBulkOperationList;
21

  
22
	private BulkWriteOptions writeOptions;
23
	private MongoCollection<Record> discardedCollection;
24
	private List<WriteModel<Record>> discardedBulkOperationList;
25

  
26
	private int bulkSize;
27

  
28
	public MongoBulkWritesManager(final MongoCollection<Record> collection,
29
			final MongoCollection<Record> discardedCollection,
30
			final int bulkSize,
31
			final boolean discardRecords) {
32
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
33
		this.validBulkOperationList = Lists.newArrayList();
34

  
35
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
36
		this.discardedBulkOperationList = Lists.newArrayList();
37

  
38
		this.bulkSize = bulkSize;
39
		this.discardRecords = discardRecords;
40
		this.writeOptions = new BulkWriteOptions().ordered(false);
41
	}
42

  
43
	public void insert(final Record r) {
44
		try {
45
			if (StringUtils.isNotBlank(r.getId())) {
46

  
47
				if (log.isDebugEnabled()) {
48
					log.debug("Saving object" + r.getId());
49
				}
50
				validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", r.getId()), r, new UpdateOptions().upsert(true)));
51
				if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
52
					validCollection.bulkWrite(validBulkOperationList, writeOptions);
53
					validBulkOperationList.clear();
54
				}
55
			} else {
56
				if (discardRecords) {
57
					log.debug("parsed record seems invalid");
58
					discardRecord(r);
59
				}
60
			}
61
		} catch (Throwable e) {
62
			if (discardRecords) {
63
				log.debug("unhandled exception: " + e.getMessage());
64
				discardRecord(r);
65
			}
66
		}
67
	}
68

  
69
	private void discardRecord(final Record record) {
70
		discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject("body", record)));
71

  
72
		if (((discardedBulkOperationList.size() % bulkSize) == 0) && !discardedBulkOperationList.isEmpty()) {
73
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
74
			discardedBulkOperationList.clear();
75
		}
76
	}
77

  
78
	public void flushBulks() {
79
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
80
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
81
		if (!validBulkOperationList.isEmpty()) {
82
			validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
83
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
84
		}
85
		if (!discardedBulkOperationList.isEmpty()) {
86
			discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
87
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
88
		}
89
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
90
		validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
91
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
92
	}
93

  
94
	private MongoCollection<Record> getCollectionWithWriteConcern(MongoCollection<Record> collection, WriteConcern writeConcern) {
95
		return collection.withWriteConcern(writeConcern);
96
	}
97

  
98
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/store/mongo/MongoDBConfiguration.java
1
package eu.dnetlib.msro.workers.aggregation.store.mongo;
2

  
3
import com.mongodb.MongoClient;
4
import com.mongodb.client.MongoDatabase;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8
import org.springframework.context.annotation.Bean;
9
import org.springframework.context.annotation.Configuration;
10

  
11
/**
12
 * Created by claudio on 13/03/2017.
13
 */
14
@Configuration
15
public class MongoDBConfiguration {
16

  
17
	private static final Log log = LogFactory.getLog(MongoDBConfiguration.class);
18

  
19
	@Autowired
20
	private MongoDBProperties properties;
21

  
22
	@Bean
23
	public MongoClient mongoClient() throws Exception {
24
		log.info("creating MongoClient");
25
		return new MongoClient(properties.getHost(), properties.getPort());
26
	}
27

  
28
	@Bean
29
	public MongoDatabase db() throws Exception {
30
		log.info("creating MongoDatabase");
31
		return mongoClient().getDatabase(properties.getDbname());
32
	}
33

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff