Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.Iterator;
4
import java.util.List;
5
import java.util.concurrent.ArrayBlockingQueue;
6
import java.util.concurrent.BlockingQueue;
7
import java.util.regex.Pattern;
8

    
9
import org.apache.commons.lang.StringUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.bson.BasicBSONObject;
13
import org.springframework.beans.factory.annotation.Required;
14

    
15
import com.google.common.collect.Iterators;
16
import com.google.common.collect.Lists;
17
import com.mongodb.BasicDBObject;
18
import com.mongodb.DBCollection;
19
import com.mongodb.DBCursor;
20
import com.mongodb.DBObject;
21
import com.mongodb.QueryBuilder;
22

    
23
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
24
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
25
import eu.dnetlib.data.mdstore.modular.RecordParser;
26
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
27
import eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreUtils;
28
import eu.dnetlib.enabling.resultset.ResultSetListener;
29
import eu.dnetlib.miscutils.collections.MappedCollection;
30
import eu.dnetlib.miscutils.functional.UnaryFunction;
31

    
32
public class MongoMDStore implements MDStore {
33

    
34
	private static final int BULK_SIZE = 500;
35

    
36
	private static final class SerializeMongoRecord implements UnaryFunction<String, DBObject> {
37

    
38
		@Override
39
		public String evaluate(final DBObject arg) {
40
			return (String) arg.get("body");
41
		}
42
	}
43

    
44
	private static final class SerializeMongoRecordId implements UnaryFunction<String, DBObject> {
45

    
46
		@Override
47
		public String evaluate(final DBObject arg) {
48
			return (String) arg.get("id");
49
		}
50
	}
51

    
52
	private static final Log log = LogFactory.getLog(MongoMDStore.class);
53

    
54
	private String id;
55
	private DBCollection collection;
56
	private DBCollection discardedCollection;
57

    
58
	private RecordParser recordParser;
59

    
60
	private final boolean discardRecords;
61

    
62
	private static List<String> requiredIndicies = Lists.newArrayList("{ \"id\" : 1}", "{ \"timestamp\" : 1}", "{ \"originalId\" : 1}");
63

    
64
	public MongoMDStore(final String id, final DBCollection collection, final RecordParser recordParser, final boolean discardRecords) {
65
		this.id = id;
66
		this.collection = collection;
67
		this.discardedCollection = collection.getDB().getCollection("discarded-" + StringUtils.substringBefore(id, "_"));
68
		this.recordParser = recordParser;
69
		this.discardRecords = discardRecords;
70
	}
71

    
72
	@Override
73
	public void feed(final Iterable<String> records, final boolean incremental) {
74
		// TODO: remove incremental from MDStore API. It is used in MDStoreModular. Useless here.
75

    
76
		ensureIndices();
77

    
78
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
79
		final Object sentinel = new Object();
80
		final Thread background = new Thread(new Runnable() {
81

    
82
			@Override
83
			public void run() {
84
				MongoBulkWritesManager bulkWritesManager = new MongoBulkWritesManager(collection, discardedCollection, BULK_SIZE, recordParser, discardRecords);
85
				while (true) {
86
					try {
87
						final Object record = queue.take();
88
						if (record == sentinel) {
89
							bulkWritesManager.flushBulks();
90
							break;
91
						}
92
						bulkWritesManager.insert((String) record);
93
					} catch (final InterruptedException e) {
94
						log.fatal("got exception in background thread", e);
95
						throw new IllegalStateException(e);
96
					}
97
				}
98
			}
99
		});
100

    
101
		background.start();
102
		try {
103
			log.info("feeding mdstore " + id);
104
			if (records != null) {
105
				for (final String record : records) {
106
					queue.put(record);
107
				}
108
			}
109
			queue.put(sentinel);
110
			log.info("finished feeding mdstore " + id);
111

    
112
			background.join();
113
		} catch (final InterruptedException e) {
114
			throw new IllegalStateException(e);
115
		}
116
		// double check
117
		ensureIndices();
118
		collection.ensureIndex(new BasicDBObject("id", 1));
119
	}
120

    
121
	public void ensureIndices() {
122
		collection.resetIndexCache();
123
		for (final String key : Lists.newArrayList("id", "timestamp", "originalId")) {
124
			collection.ensureIndex(new BasicDBObject(key, 1));
125
		}
126
	}
127

    
128
	public boolean isIndexed() {
129
		return Lists.newArrayList(new MappedCollection<String, DBObject>(collection.getIndexInfo(), new UnaryFunction<String, DBObject>() {
130

    
131
			@Override
132
			public String evaluate(final DBObject dbo) {
133
				return new BasicBSONObject(dbo.toMap()).getString("key");
134
			}
135
		})).containsAll(requiredIndicies);
136
	}
137

    
138
	/**
139
	 * Method searches for the given string grep into this collection and replaces it with the given replacement.
140
	 *
141
	 * @param grep
142
	 *            the string to search
143
	 * @param replace
144
	 *            the replacement
145
	 */
146
	public void replace(final String grep, final String replace) {
147
		final Pattern regex = Pattern.compile(grep, Pattern.MULTILINE);
148
		final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
149

    
150
		log.debug("FOUND: " + matches.size());
151

    
152
		for (final DBObject match : matches) {
153
			final DBObject o = new BasicDBObject(match.toMap());
154
			o.put("body", regex.matcher((String) match.get("body")).replaceAll(replace));
155
			collection.update(match, o);
156
		}
157
	}
158

    
159
	@Override
160
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) {
161
		return deliver(from, until, recordFilter, new SerializeMongoRecord());
162
	}
163

    
164
	@Override
165
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) {
166
		return deliver(from, until, recordFilter, new SerializeMongoRecordId());
167
	}
168

    
169
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final UnaryFunction<String, DBObject> serializer) {
170
		ensureIndices();
171

    
172
		final Pattern filter = recordFilter != null && recordFilter.length() > 0 ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
173

    
174
		return new MongoResultSetListener(collection, filter, serializer);
175
	}
176

    
177
	@Override
178
	public Iterable<String> iterate() {
179
		return new Iterable<String>() {
180

    
181
			@Override
182
			public Iterator<String> iterator() {
183
				return Iterators.transform(collection.find(), MDStoreUtils.body());
184
			}
185
		};
186
	}
187

    
188
	@Override
189
	public void deleteRecord(final String recordId) {
190
		collection.remove(new BasicDBObject("id", recordId));
191
	}
192

    
193
	@Override
194
	public String getRecord(final String recordId) throws DocumentNotFoundException {
195
		final DBObject obj = collection.findOne(new BasicDBObject("id", recordId));
196
		if (obj == null || obj.containsField("body") == false) { throw new DocumentNotFoundException(String.format(
197
				"The document with id '%s' does not exist in mdstore: '%s'", recordId, id)); }
198
		final String body = (String) obj.get("body");
199
		if (body.trim().length() == 0) { throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'",
200
				recordId, id)); }
201
		return new SerializeMongoRecord().evaluate(obj);
202
	}
203

    
204
	@Override
205
	public void truncate() {
206
		collection.drop();
207
		discardedCollection.drop();
208
	}
209

    
210
	public DBObject getMDStoreMetadata() {
211
		return collection.getDB().getCollection("metadata").findOne(new BasicDBObject("mdId", getId()));
212
	}
213

    
214
	@Override
215
	public String getFormat() {
216
		return (String) getMDStoreMetadata().get("format");
217
	}
218

    
219
	@Override
220
	public String getInterpretation() {
221
		return (String) getMDStoreMetadata().get("interpretation");
222
	}
223

    
224
	@Override
225
	public String getLayout() {
226
		return (String) getMDStoreMetadata().get("layout");
227
	}
228

    
229
	@Override
230
	public String getId() {
231
		return id;
232
	}
233

    
234
	public void setId(final String id) {
235
		this.id = id;
236
	}
237

    
238
	public DBCollection getCollection() {
239
		return collection;
240
	}
241

    
242
	public void setCollection(final DBCollection collection) {
243
		this.collection = collection;
244
	}
245

    
246
	public RecordParser getRecordParser() {
247
		return recordParser;
248
	}
249

    
250
	@Required
251
	public void setRecordParser(final RecordParser recordParser) {
252
		this.recordParser = recordParser;
253
	}
254

    
255
	@Override
256
	public int getSize() {
257
		return (int) collection.getCount();
258
	}
259

    
260
	public DBCollection getDiscardedCollection() {
261
		return discardedCollection;
262
	}
263

    
264
	public void setDiscardedCollection(final DBCollection discardedCollection) {
265
		this.discardedCollection = discardedCollection;
266
	}
267

    
268
	@Override
269
	public void feed(final Iterable<String> records, final boolean incremental, final List<MDFormatDescription> mdformats) {
270
		// TODO Auto-generated method stub
271

    
272
	}
273

    
274
}
(4-4/5)