Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.*;
4
import java.util.concurrent.*;
5
import java.util.regex.Pattern;
6

    
7
import com.google.common.base.Function;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Iterators;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import com.mongodb.BasicDBObject;
13
import com.mongodb.DBObject;
14
import com.mongodb.QueryBuilder;
15
import com.mongodb.client.FindIterable;
16
import com.mongodb.client.ListIndexesIterable;
17
import com.mongodb.client.MongoCollection;
18
import com.mongodb.client.MongoDatabase;
19
import com.mongodb.client.model.IndexOptions;
20
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
21
import eu.dnetlib.data.mdstore.MDStoreServiceException;
22
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
23
import eu.dnetlib.data.mdstore.modular.RecordParser;
24
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
25
import eu.dnetlib.enabling.resultset.ResultSetListener;
26
import org.apache.commons.lang3.StringUtils;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.bson.conversions.Bson;
30
import org.springframework.beans.factory.annotation.Required;
31

    
32
public class MongoMDStore implements MDStore {
33

    
34
	private static final int BULK_SIZE = 500;
35
	private static final Log log = LogFactory.getLog(MongoMDStore.class);
36

    
37
	private final boolean discardRecords;
38
	private String id;
39
	private MongoDatabase mongoDatabase;
40
	private MongoCollection<DBObject> collection;
41
	private MongoCollection<DBObject> discardedCollection;
42
	private List<MDFormatDescription> mdformats;
43

    
44
	private RecordParser recordParser;
45

    
46
	private static List<String> indices = Lists.newArrayList("id", "timestamp", "originalId");
47
	private final IndexOptions options = new IndexOptions().background(true);
48

    
49
	public MongoMDStore(final String id,
50
			final MongoCollection<DBObject> collection,
51
			final RecordParser recordParser,
52
			final boolean discardRecords,
53
			final MongoDatabase mongoDatabase) {
54
		this.id = id;
55
		this.mongoDatabase = mongoDatabase;
56
		this.collection = collection;
57
		this.discardedCollection = this.mongoDatabase.getCollection("discarded-" + StringUtils.substringBefore(id, "_"), DBObject.class);
58
		this.recordParser = recordParser;
59
		this.discardRecords = discardRecords;
60
	}
61

    
62
	@Override
63
	public int feed(final Iterable<String> records, final boolean incremental, final List<MDFormatDescription> mdformats) {
64
		this.mdformats = mdformats;
65
		return feed(records, incremental);
66
	}
67

    
68
	@Override
69
	public int feed(final Iterable<String> records, final boolean incremental) {
70
		// TODO: remove incremental from MDStore API. It is used in MDStoreModular. Useless here.
71

    
72
		ensureIndices();
73

    
74
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
75
		final Object sentinel = new Object();
76
		int countStored = 0;
77
		final Callable<Integer> writer = () -> {
78
			final MongoBulkWritesManager bulkWritesManager =
79
					new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
80
			int count = 0;
81
			while (true) {
82
				try {
83
					final Object record = queue.take();
84
					if (record == sentinel) {
85
						bulkWritesManager.flushBulks();
86
						break;
87
					}
88
					count++;
89
					bulkWritesManager.insert((String) record);
90
				} catch (final InterruptedException e) {
91
					log.fatal("got exception in background thread", e);
92
					throw new IllegalStateException(e);
93
				}
94
			}
95
			log.debug(String.format("extracted %s records from feeder queue", count));
96
			return count;
97
		};
98
		final ExecutorService executorService = Executors.newSingleThreadExecutor();
99
		Future<Integer> storedCountInt = executorService.submit(writer);
100

    
101
		try {
102
			log.info("feeding mdstore " + id);
103
			if (records != null) {
104
				for (final String record : records) {
105
					queue.put(record);
106
				}
107
			}
108
			queue.put(sentinel);
109
			countStored = storedCountInt.get().intValue();
110
		} catch (final InterruptedException e) {
111
			log.error("Error on feeding mdstore with id:" + id, e);
112
			throw new IllegalStateException(e);
113
		} catch (ExecutionException e) {
114
			log.error("Error on feeding mdstore with id:" + id, e);
115
			throw new IllegalStateException(e);
116
		}
117
		log.info("finished feeding mdstore " + id);
118
		return countStored;
119
	}
120

    
121
	public void ensureIndices() {
122
		for (final String key : indices) {
123
			collection.createIndex(new BasicDBObject(key, 1), options);
124
		}
125
		if (mdformats != null) {
126
			for (final MDFormatDescription description : mdformats) {
127
				collection.createIndex(new BasicDBObject(description.getName(), 1), options);
128
			}
129
		}
130
	}
131

    
132
	public boolean isIndexed() {
133
		final ListIndexesIterable<DBObject> found = collection.listIndexes(DBObject.class);
134
		return Sets.newHashSet(Iterables.transform(found, dbo -> {
135
			final Set<String> keyset = ((DBObject) dbo.get("key")).toMap().keySet();
136
			return Iterables.getFirst(keyset, "");
137
		})).containsAll(indices);
138
	}
139

    
140
	/**
141
	 * Method searches for the given string grep into this collection and replaces it with the given replacement.
142
	 *
143
	 * @param grep
144
	 *            the string to search
145
	 * @param replace
146
	 *            the replacement
147
	 */
148
	public void replace(final String grep, final String replace) {
149
		final Pattern regex = Pattern.compile(grep, Pattern.MULTILINE);
150
		BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(regex).get();
151
		final FindIterable<DBObject> matches = collection.find(query, DBObject.class);
152
		//final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
153
		if (log.isDebugEnabled())
154
			log.debug("FOUND: " + Lists.newArrayList(matches).size());
155

    
156
		for (final DBObject match : matches) {
157
			final DBObject o = new BasicDBObject(match.toMap());
158
			o.put("body", regex.matcher((String) match.get("body")).replaceAll(replace));
159
			collection.findOneAndReplace(new BasicDBObject("_id", o.get("_id")), o);
160
		}
161
	}
162

    
163
	@Override
164
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
165
		return deliver(from, until, recordFilter, new SerializeMongoRecord());
166
	}
167

    
168
	@Override
169
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
170
		return deliver(from, until, recordFilter, new SerializeMongoRecordId());
171
	}
172

    
173
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer)
174
			throws MDStoreServiceException {
175
		final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
176

    
177
		return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);	}
178

    
179
	private Long parseLong(final String s) throws MDStoreServiceException {
180
		if (StringUtils.isBlank(s)) {
181
			return null;
182
		}
183
		try {
184
			return Long.valueOf(s);
185
		} catch (NumberFormatException e) {
186
			throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
187
		}
188
	}
189

    
190
	@Override
191
	public Iterable<String> iterate() {
192
		return () -> Iterators.transform(collection.find().iterator(), arg -> (String) arg.get("body"));
193
	}
194

    
195
	@Override
196
	public void deleteRecord(final String recordId) {
197
		collection.deleteOne(new BasicDBObject("id", recordId));
198
	}
199

    
200
	@Override
201
	public String getRecord(final String recordId) throws DocumentNotFoundException {
202
		final DBObject obj = collection.find(new BasicDBObject("id", recordId)).first();
203
		if (obj == null || !obj.containsField("body")) throw new DocumentNotFoundException(String.format(
204
				"The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
205
		final String body = (String) obj.get("body");
206
		if (body.trim().length() == 0) throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'",
207
				recordId, id));
208
		return new SerializeMongoRecord().apply(obj);
209
	}
210

    
211
	@Override
212
	public List<String> deliver(final String mdId, final int pageSize, final int offset, final Map<String, String> queryParam) {
213
		final QueryBuilder query = QueryBuilder.start();
214

    
215
		for (String key : queryParam.keySet()) {
216
			query.and(key).regex(Pattern.compile(queryParam.get(key), Pattern.LITERAL));
217
		}
218

    
219
		FindIterable<DBObject> dbObjects = offset > 0
220
				? collection.find((Bson) query.get()).limit(pageSize).skip(offset)
221
				: collection.find((Bson) query.get()).limit(pageSize);
222

    
223

    
224
		queryParam.put("count", "" + collection.count((Bson) query.get()));
225

    
226
		final List<String> result = new ArrayList<>();
227
		for (final DBObject item : dbObjects) {
228
			result.add(item.get("body").toString());
229
		}
230

    
231
		return result;
232
	}
233

    
234
	@Override
235
	public void truncate() {
236
		collection.drop();
237
		discardedCollection.drop();
238
	}
239

    
240
	public DBObject getMDStoreMetadata() {
241
		return mongoDatabase.getCollection("metadata", DBObject.class).find(new BasicDBObject("mdId", getId())).first();
242
	}
243

    
244
	@Override
245
	public String getFormat() {
246
		return (String) getMDStoreMetadata().get("format");
247
	}
248

    
249
	@Override
250
	public String getInterpretation() {
251
		return (String) getMDStoreMetadata().get("interpretation");
252
	}
253

    
254
	@Override
255
	public String getLayout() {
256
		return (String) getMDStoreMetadata().get("layout");
257
	}
258

    
259
	@Override
260
	public String getId() {
261
		return id;
262
	}
263

    
264
	public void setId(final String id) {
265
		this.id = id;
266
	}
267

    
268
	public MongoCollection<DBObject> getCollection() {
269
		return collection;
270
	}
271

    
272
	public void setCollection(final MongoCollection<DBObject> collection) {
273
		this.collection = collection;
274
	}
275

    
276
	public RecordParser getRecordParser() {
277
		return recordParser;
278
	}
279

    
280
	@Required
281
	public void setRecordParser(final RecordParser recordParser) {
282
		this.recordParser = recordParser;
283
	}
284

    
285
	@Override
286
	public int getSize() {
287
		return (int) collection.count();
288
	}
289

    
290
	public MongoCollection<DBObject> getDiscardedCollection() {
291
		return discardedCollection;
292
	}
293

    
294
	public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
295
		this.discardedCollection = discardedCollection;
296
	}
297

    
298
	private class SerializeMongoRecord implements Function<DBObject, String> {
299

    
300
		@Override
301
		public String apply(final DBObject arg) {
302
			return (String) arg.get("body");
303
		}
304
	}
305

    
306
	private class SerializeMongoRecordId implements Function<DBObject, String> {
307

    
308
		@Override
309
		public String apply(final DBObject arg) {
310
			return (String) arg.get("id");
311
		}
312
	}
313

    
314
}
(4-4/5)