Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.*;
4
import java.util.concurrent.*;
5
import java.util.regex.Pattern;
6

    
7
import com.google.common.base.Function;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Iterators;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import com.mongodb.BasicDBObject;
13
import com.mongodb.DBObject;
14
import com.mongodb.QueryBuilder;
15
import com.mongodb.client.FindIterable;
16
import com.mongodb.client.ListIndexesIterable;
17
import com.mongodb.client.MongoCollection;
18
import com.mongodb.client.MongoDatabase;
19
import com.mongodb.client.model.IndexOptions;
20
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
21
import eu.dnetlib.data.mdstore.MDStoreServiceException;
22
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
23
import eu.dnetlib.data.mdstore.modular.RecordParser;
24
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
25
import eu.dnetlib.enabling.resultset.ResultSetListener;
26
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
27
import org.apache.commons.lang3.StringUtils;
28
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.LogFactory;
30
import org.bson.conversions.Bson;
31
import org.springframework.beans.factory.annotation.Required;
32

    
33
public class MongoMDStore implements MDStore {
34

    
35
	private static final int BULK_SIZE = 500;
36
	private static final Log log = LogFactory.getLog(MongoMDStore.class);
37

    
38
	private final boolean discardRecords;
39
	private String id;
40
	private MongoDatabase mongoDatabase;
41
	private MongoCollection<DBObject> collection;
42
	private MongoCollection<DBObject> discardedCollection;
43
	private List<MDFormatDescription> mdformats;
44

    
45
	private RecordParser recordParser;
46

    
47
	private static List<String> indices = Lists.newArrayList("id", "timestamp", "originalId");
48
	private final IndexOptions options = new IndexOptions().background(true);
49

    
50
	public MongoMDStore(final String id,
51
			final MongoCollection<DBObject> collection,
52
			final RecordParser recordParser,
53
			final boolean discardRecords,
54
			final MongoDatabase mongoDatabase) {
55
		this.id = id;
56
		this.mongoDatabase = mongoDatabase;
57
		this.collection = collection;
58
		this.discardedCollection = this.mongoDatabase.getCollection("discarded-" + StringUtils.substringBefore(id, "_"), DBObject.class);
59
		this.recordParser = recordParser;
60
		this.discardRecords = discardRecords;
61
	}
62

    
63
	@Override
64
	public int feed(final Iterable<String> records, final boolean incremental, final List<MDFormatDescription> mdformats) {
65
		this.mdformats = mdformats;
66
		return feed(records, incremental);
67
	}
68

    
69
	@Override
70
	public int feed(final Iterable<String> records, final boolean incremental) {
71
		// TODO: remove incremental from MDStore API. It is used in MDStoreModular. Useless here.
72

    
73
		ensureIndices();
74

    
75
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
76
		final Object sentinel = new Object();
77
		int countStored = 0;
78
		final Callable<Integer> writer = () -> {
79
			final MongoBulkWritesManager bulkWritesManager =
80
					new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
81
			int count = 0;
82
			while (true) {
83
				try {
84
					final Object record = queue.take();
85
					if(record == ResultSetClient.SENTINEL_PROC_CANCELED){
86
						//it means the wf was cancelled, so we must give this information back to the caller to avoid commit
87
						log.warn(String.format("Process cancelled after %d records, returning -1", count));
88
						return -1;
89
					}
90
					if (record == sentinel) {
91
						bulkWritesManager.flushBulks();
92
						break;
93
					}
94
					count++;
95
					bulkWritesManager.insert((String) record);
96
				} catch (final InterruptedException e) {
97
					log.fatal("got exception in background thread", e);
98
					throw new IllegalStateException(e);
99
				}
100
			}
101
			log.debug(String.format("extracted %s records from feeder queue", count));
102
			return count;
103
		};
104
		final ExecutorService executorService = Executors.newSingleThreadExecutor();
105
		Future<Integer> storedCountInt = executorService.submit(writer);
106

    
107
		try {
108
			log.info("feeding mdstore " + id);
109
			if (records != null) {
110
				for (final String record : records) {
111
					queue.put(record);
112
				}
113
			}
114
			queue.put(sentinel);
115
			countStored = storedCountInt.get().intValue();
116
		} catch (final InterruptedException e) {
117
			log.error("Error on feeding mdstore with id:" + id, e);
118
			throw new IllegalStateException(e);
119
		} catch (ExecutionException e) {
120
			log.error("Error on feeding mdstore with id:" + id, e);
121
			throw new IllegalStateException(e);
122
		}
123
		log.info("finished feeding mdstore " + id);
124
		return countStored;
125
	}
126

    
127
	public void ensureIndices() {
128
		for (final String key : indices) {
129
			collection.createIndex(new BasicDBObject(key, 1), options);
130
		}
131
		if (mdformats != null) {
132
			for (final MDFormatDescription description : mdformats) {
133
				collection.createIndex(new BasicDBObject(description.getName(), 1), options);
134
			}
135
		}
136
	}
137

    
138
	public boolean isIndexed() {
139
		final ListIndexesIterable<DBObject> found = collection.listIndexes(DBObject.class);
140
		return Sets.newHashSet(Iterables.transform(found, dbo -> {
141
			final Set<String> keyset = ((DBObject) dbo.get("key")).toMap().keySet();
142
			return Iterables.getFirst(keyset, "");
143
		})).containsAll(indices);
144
	}
145

    
146
	/**
147
	 * Method searches for the given string grep into this collection and replaces it with the given replacement.
148
	 *
149
	 * @param grep
150
	 *            the string to search
151
	 * @param replace
152
	 *            the replacement
153
	 */
154
	public void replace(final String grep, final String replace) {
155
		final Pattern regex = Pattern.compile(grep, Pattern.MULTILINE);
156
		BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(regex).get();
157
		final FindIterable<DBObject> matches = collection.find(query, DBObject.class);
158
		//final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
159
		if (log.isDebugEnabled())
160
			log.debug("FOUND: " + Lists.newArrayList(matches).size());
161

    
162
		for (final DBObject match : matches) {
163
			final DBObject o = new BasicDBObject(match.toMap());
164
			o.put("body", regex.matcher((String) match.get("body")).replaceAll(replace));
165
			collection.findOneAndReplace(new BasicDBObject("_id", o.get("_id")), o);
166
		}
167
	}
168

    
169
	@Override
170
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
171
		return deliver(from, until, recordFilter, new SerializeMongoRecord());
172
	}
173

    
174
	@Override
175
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
176
		return deliver(from, until, recordFilter, new SerializeMongoRecordId());
177
	}
178

    
179
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer)
180
			throws MDStoreServiceException {
181
		final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
182

    
183
		return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);	}
184

    
185
	private Long parseLong(final String s) throws MDStoreServiceException {
186
		if (StringUtils.isBlank(s)) {
187
			return null;
188
		}
189
		try {
190
			return Long.valueOf(s);
191
		} catch (NumberFormatException e) {
192
			throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
193
		}
194
	}
195

    
196
	@Override
197
	public Iterable<String> iterate() {
198
		return () -> Iterators.transform(collection.find().iterator(), arg -> (String) arg.get("body"));
199
	}
200

    
201
	@Override
202
	public void deleteRecord(final String recordId) {
203
		collection.deleteOne(new BasicDBObject("id", recordId));
204
	}
205

    
206
	@Override
207
	public String getRecord(final String recordId) throws DocumentNotFoundException {
208
		final DBObject obj = collection.find(new BasicDBObject("id", recordId)).first();
209
		if (obj == null || !obj.containsField("body")) throw new DocumentNotFoundException(String.format(
210
				"The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
211
		final String body = (String) obj.get("body");
212
		if (body.trim().length() == 0) throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'",
213
				recordId, id));
214
		return new SerializeMongoRecord().apply(obj);
215
	}
216

    
217
	@Override
218
	public List<String> deliver(final String mdId, final int pageSize, final int offset, final Map<String, String> queryParam) {
219
		final QueryBuilder query = QueryBuilder.start();
220

    
221
		for (String key : queryParam.keySet()) {
222
			query.and(key).regex(Pattern.compile(queryParam.get(key), Pattern.LITERAL));
223
		}
224

    
225
		FindIterable<DBObject> dbObjects = offset > 0
226
				? collection.find((Bson) query.get()).limit(pageSize).skip(offset)
227
				: collection.find((Bson) query.get()).limit(pageSize);
228

    
229

    
230
		queryParam.put("count", "" + collection.count((Bson) query.get()));
231

    
232
		final List<String> result = new ArrayList<>();
233
		for (final DBObject item : dbObjects) {
234
			result.add(item.get("body").toString());
235
		}
236

    
237
		return result;
238
	}
239

    
240
	@Override
241
	public void truncate() {
242
		collection.drop();
243
		discardedCollection.drop();
244
	}
245

    
246
	public DBObject getMDStoreMetadata() {
247
		return mongoDatabase.getCollection("metadata", DBObject.class).find(new BasicDBObject("mdId", getId())).first();
248
	}
249

    
250
	@Override
251
	public String getFormat() {
252
		return (String) getMDStoreMetadata().get("format");
253
	}
254

    
255
	@Override
256
	public String getInterpretation() {
257
		return (String) getMDStoreMetadata().get("interpretation");
258
	}
259

    
260
	@Override
261
	public String getLayout() {
262
		return (String) getMDStoreMetadata().get("layout");
263
	}
264

    
265
	@Override
266
	public String getId() {
267
		return id;
268
	}
269

    
270
	public void setId(final String id) {
271
		this.id = id;
272
	}
273

    
274
	public MongoCollection<DBObject> getCollection() {
275
		return collection;
276
	}
277

    
278
	public void setCollection(final MongoCollection<DBObject> collection) {
279
		this.collection = collection;
280
	}
281

    
282
	public RecordParser getRecordParser() {
283
		return recordParser;
284
	}
285

    
286
	@Required
287
	public void setRecordParser(final RecordParser recordParser) {
288
		this.recordParser = recordParser;
289
	}
290

    
291
	@Override
292
	public int getSize() {
293
		return (int) collection.count();
294
	}
295

    
296
	public MongoCollection<DBObject> getDiscardedCollection() {
297
		return discardedCollection;
298
	}
299

    
300
	public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
301
		this.discardedCollection = discardedCollection;
302
	}
303

    
304
	private class SerializeMongoRecord implements Function<DBObject, String> {
305

    
306
		@Override
307
		public String apply(final DBObject arg) {
308
			return (String) arg.get("body");
309
		}
310
	}
311

    
312
	private class SerializeMongoRecordId implements Function<DBObject, String> {
313

    
314
		@Override
315
		public String apply(final DBObject arg) {
316
			return (String) arg.get("id");
317
		}
318
	}
319

    
320
}
(4-4/5)