Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.List;
4

    
5
import com.google.common.collect.Lists;
6
import com.google.common.primitives.Ints;
7
import com.mongodb.BasicDBObject;
8
import com.mongodb.BasicDBObjectBuilder;
9
import com.mongodb.DBObject;
10
import com.mongodb.client.AggregateIterable;
11
import com.mongodb.client.MongoCollection;
12
import com.mongodb.client.MongoDatabase;
13
import com.mongodb.client.model.CreateCollectionOptions;
14
import eu.dnetlib.data.mdstore.MDStoreServiceException;
15
import eu.dnetlib.data.mdstore.modular.MDStoreDescription;
16
import eu.dnetlib.data.mdstore.modular.RecordParser;
17
import eu.dnetlib.data.mdstore.modular.RecordParserFactory;
18
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
19
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDBStatus;
20
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
21
import eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager;
22
import eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreUtils;
23
import eu.dnetlib.miscutils.collections.FilteredCollection;
24
import eu.dnetlib.miscutils.collections.MappedCollection;
25
import eu.dnetlib.miscutils.datetime.DateUtils;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
28
import org.bson.Document;
29
import org.springframework.beans.factory.annotation.Autowired;
30
import org.springframework.beans.factory.annotation.Required;
31

    
32
/**
33
 * Factory bean for MongoMDStore instances.
34
 *
35
 * @author marko
36
 */
37
public class MDStoreDaoImpl implements MDStoreDao {
38

    
39
	public static final String MD_ID = "mdId";
40
	public static final String FORMAT = "format";
41
	public static final String INTERPRETATION = "interpretation";
42
	public static final String LAYOUT = "layout";
43
	public static final String SIZE = "size";
44
	public static final String METADATA_NAME = "metadata";
45
	private static final Log log = LogFactory.getLog(MDStoreDaoImpl.class);
46
	private MongoDatabase db;
47

    
48
	private RecordParserFactory recordParserFactory;
49

    
50
	private boolean discardRecords = true;
51

    
52
	@Autowired
53
	private MDStoreTransactionManager transactionManager;
54

    
55
	/**
56
	 * {@inheritDoc}
57
	 */
58
	@Override
59
	public void createMDStore(final String mdId, final String format, final String interpretation, final String layout) throws MDStoreServiceException {
60
		transactionManager.createMDStore(mdId);
61
		final String internalId = transactionManager.getMDStoreCollection(mdId);
62

    
63
		if (!Lists.newArrayList(getDb().listCollectionNames()).contains(internalId)) {
64
			log.info(String.format("creating collection %s", internalId));
65
			getDb().createCollection(internalId, new CreateCollectionOptions());
66
		}
67
		final MongoCollection<DBObject> coll = getDb().getCollection(METADATA_NAME, DBObject.class);
68
		final BasicDBObject obj = new BasicDBObject();
69
		obj.put(MD_ID, mdId);
70
		obj.put(FORMAT, format);
71
		obj.put(INTERPRETATION, interpretation);
72
		obj.put(LAYOUT, layout);
73
		obj.put(SIZE, 0);
74
		coll.insertOne(obj);
75
	}
76

    
77
	/**
78
	 * {@inheritDoc}
79
	 */
80
	@Override
81
	public void deleteMDStore(final String mdId) throws MDStoreServiceException {
82
		final MongoCollection<DBObject> metadata = getDb().getCollection(METADATA_NAME, DBObject.class);
83
		if (metadata == null) throw new MDStoreServiceException("cannot find metadata collection");
84
		transactionManager.dropMDStore(mdId);
85
		metadata.deleteOne(new BasicDBObject(MD_ID, mdId));
86
		log.info("deleted mdId: " + mdId);
87
	}
88

    
89
	/**
90
	 * {@inheritDoc}
91
	 */
92
	@Override
93
	public MDStore getMDStore(final String mdId) throws MDStoreServiceException {
94
		final String internalId = transactionManager.getMDStoreCollection(mdId);
95
		return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParser(), isDiscardRecords(), getDb());
96
	}
97

    
98
	/**
99
	 * {@inheritDoc}
100
	 */
101
	@Override
102
	public MDStore readMDStore(final String mdId) throws MDStoreServiceException {
103
		final String internalId = transactionManager.readMdStore(mdId);
104
		return new MongoMDStore(mdId, getDb().getCollection(internalId, DBObject.class), getRecordParser(), isDiscardRecords(), getDb());
105
	}
106

    
107
	/**
108
	 * {@inheritDoc}
109
	 */
110
	@Override
111
	public MDStore startTransaction(final String mdId, final boolean refresh) throws MDStoreServiceException {
112
		final String transactionId = transactionManager.startTransaction(mdId, refresh);
113
		return new MongoMDStore(transactionId, getDb().getCollection(transactionId, DBObject.class), getRecordParser(), isDiscardRecords(),
114
				getDb());
115
	}
116

    
117
	private RecordParser getRecordParser() {
118
		final RecordParser parser = getRecordParserFactory().newInstance();
119
		parser.setTimestamp(DateUtils.now());
120
		return parser;
121
	}
122

    
123
	/**
124
	 * {@inheritDoc}
125
	 */
126
	@Override
127
	public List<MDStoreDescription> listMDStores() {
128
		return MappedCollection.listMap(getDb().getCollection(METADATA_NAME, DBObject.class).find(), input -> {
129

    
130
			final String mdId = (String) input.get(MD_ID);
131
			log.debug("Getting info for " + mdId);
132
			final String format = (String) input.get(FORMAT);
133
			final String layout = (String) input.get(LAYOUT);
134
			final String interpretation = (String) input.get(INTERPRETATION);
135
			MongoMDStore currentMDStore = null;
136
			final MDStoreDescription description = new MDStoreDescription();
137
			try {
138
				currentMDStore = (MongoMDStore) getMDStore(mdId);
139
			} catch (final MDStoreServiceException e) {
140
				log.error("Error on retrieving mdstore for getting info mdId " + mdId);
141
			}
142

    
143
			int size = 0;
144
			if (input.containsField(SIZE)) {
145
				log.debug("Size retrieved from metadata for mdId :" + mdId);
146
				size = (Integer) input.get(SIZE);
147
			} else {
148
				if (currentMDStore != null) {
149
					log.debug("Size not Found in metadata for mdId :" + mdId + " calling getCount ");
150
					size = currentMDStore.getSize();
151
					input.put("size", size);
152
					getDb().getCollection(METADATA_NAME, DBObject.class).findOneAndReplace(new BasicDBObject(MD_ID, mdId), input);
153
				}
154
			}
155
			if (currentMDStore != null) {
156
				description.setIndexed(currentMDStore.isIndexed());
157
			}
158
			description.setId(mdId);
159
			description.setFormat(format);
160
			description.setLayout(layout);
161
			description.setInterpretation(interpretation);
162
			description.setSize(size);
163
			return description;
164
		});
165
	}
166

    
167
	/**
168
	 * {@inheritDoc}
169
	 */
170
	@Override
171
	public List<String> listMDStores(final String format, final String layout, final String interpretation) {
172
		return MappedCollection.listMap(
173
				FilteredCollection.listFilter(getDb().getCollection(METADATA_NAME, DBObject.class).find(), MDStoreUtils.dboFilter(format, layout, interpretation)),
174
				MDStoreUtils.mdId());
175
	}
176

    
177
	/**
178
	 * {@inheritDoc}
179
	 */
180
	@Override
181
	public int getCachedSize(final String id) throws MDStoreServiceException {
182
		log.debug("retrieve cached size for mdstore: " + id);
183
		final DBObject desc = getDb().getCollection(METADATA_NAME, DBObject.class).find(new BasicDBObject(MD_ID, id)).first();
184
		if (!desc.containsField(SIZE)) {
185
			desc.put(SIZE, getMDStore(id).getSize());
186
		}
187

    
188
		final Object oSize = desc.get(SIZE);
189
		return (Integer) oSize;
190
	}
191

    
192
	/**
193
	 * {@inheritDoc}
194
	 */
195
	@Override
196
	public void refreshSizes() throws MDStoreServiceException {
197
		for (final MDStoreDescription mdStoreId : listMDStores()) {
198
			refreshSize(mdStoreId.getId());
199
		}
200
	}
201

    
202
	/**
203
	 * {@inheritDoc}
204
	 */
205
	@Override
206
	public int refreshSize(final String mdStoreId) throws MDStoreServiceException {
207
		final int size = (int) getDb().getCollection(transactionManager.getMDStoreCollection(mdStoreId)).count();
208
		final MongoCollection<DBObject> metadata = getDb().getCollection(METADATA_NAME, DBObject.class);
209
		metadata.updateOne(new BasicDBObject(MD_ID, mdStoreId), new BasicDBObject("$set", new BasicDBObject(SIZE, size)));
210
		return size;
211
	}
212

    
213
	@Override
214
	public int getSumOfSizes(final String format, final String layout, final String interpretation) throws MDStoreServiceException {
215
		final MongoCollection<DBObject> metadata = getDb().getCollection(METADATA_NAME, DBObject.class);
216
		BasicDBObject matchObj = (BasicDBObject) BasicDBObjectBuilder.start("$match",
217
				BasicDBObjectBuilder.start("format", format).add("layout", layout).add("interpretation", interpretation).get()).get();
218
		BasicDBObject groupObj = (BasicDBObject) BasicDBObjectBuilder.start("$group",
219
				BasicDBObjectBuilder.start("_id", "").add("total", new BasicDBObject("$sum", "$" + SIZE)).get()).get();
220
		BasicDBObject projectObj = new BasicDBObject("$project", new BasicDBObject("_id", 0).append("total", 1));
221
		List<BasicDBObject> pipeline = Lists.newArrayList(matchObj, groupObj, projectObj);
222
		AggregateIterable<DBObject> output = metadata.aggregate(pipeline, DBObject.class);
223
		DBObject result = output.first();
224
		if (result == null || !result.containsField("total")) {
225
			log.debug("No total found");
226
			return 0;
227
		} else return (Integer) result.get("total");
228
	}
229

    
230
	/**
231
	 * {@inheritDoc}
232
	 */
233
	@Override
234
	public void commit(final String transactionId, final String mdId) throws MDStoreServiceException {
235
		transactionManager.commit(transactionId, mdId, getMDStore(mdId));
236
	}
237

    
238
	/**
239
	 * Getter for property 'db'.
240
	 *
241
	 * @return Value for property 'db'.
242
	 */
243
	public MongoDatabase getDb() {
244
		return db;
245
	}
246

    
247
	/**
248
	 * Setter for property 'db'.
249
	 *
250
	 * @param db
251
	 *            Value to set for property 'db'.
252
	 */
253
	@Required
254
	public void setDb(final MongoDatabase db) {
255
		this.db = db;
256
	}
257

    
258
	/**
259
	 * Getter for property 'recordParser'.
260
	 *
261
	 * @return Value for property 'recordParser'.
262
	 */
263
	public RecordParserFactory getRecordParserFactory() {
264
		return recordParserFactory;
265
	}
266

    
267

    
268
	@Required
269
	public void setRecordParserFactory(final RecordParserFactory recordParserFactory) {
270
		this.recordParserFactory = recordParserFactory;
271
	}
272

    
273
	/**
274
	 * Getter for property 'discardRecords'.
275
	 *
276
	 * @return Value for property 'discardRecords'.
277
	 */
278
	public boolean isDiscardRecords() {
279
		return discardRecords;
280
	}
281

    
282
	/**
283
	 * Setter for property 'discardRecords'.
284
	 *
285
	 * @param discardRecords
286
	 *            Value to set for property 'discardRecords'.
287
	 */
288
	public void setDiscardRecords(final boolean discardRecords) {
289
		this.discardRecords = discardRecords;
290
	}
291

    
292
	@Override
293
	public MDStoreDBStatus getDBStatus() {
294
		final int handledDatastructures = Ints.saturatedCast(getDb().getCollection(METADATA_NAME).count());
295
		//final int usedDiskSpace = Ints.saturatedCast(getDb().getStats().getLong("storageSize") / (1024 * 1024)); // in MB
296
		//{ dbStats: 1, scale: 1 }
297
		BasicDBObject statsQuery = new BasicDBObject("dbStats", 1);
298
		statsQuery.put("scale", 1024 * 1024); //storageSize in MB
299
		final Document statsRes = getDb().runCommand(statsQuery);
300
		log.debug("DBStatus --  " + statsRes.toJson());
301
		int usedDiskSpace;
302
		//trying to handle different versions of the mongo server: old version returns storage size as long, new version as double
303
		//TODO: simplify this when dev, beta, production are aligned with our local, latest, mongo version
304
		String usedDiskSpaceStr = statsRes.get("storageSize").toString();
305
		try {
306
			Long usedDiskSpaceLong = Long.parseLong(usedDiskSpaceStr);
307
			usedDiskSpace = Ints.saturatedCast(usedDiskSpaceLong);
308
		} catch (NumberFormatException nfe) {
309
			Double usedDiskSpaceDbl = Double.parseDouble(usedDiskSpaceStr);
310
			usedDiskSpace = usedDiskSpaceDbl.intValue();
311
		}
312
		final String date = DateUtils.now_ISO8601();
313
		return new MDStoreDBStatus(handledDatastructures, usedDiskSpace, date);
314
	}
315

    
316
	@Override
317
	public void startGarbage() throws MDStoreServiceException {
318
		this.transactionManager.garbage();
319
	}
320

    
321
	@Override
322
	public void invalidTransaction(final String transactionId, final String mdId) throws MDStoreServiceException {
323
		transactionManager.dropTransaction(mdId, transactionId);
324

    
325
	}
326

    
327
}
(1-1/5)