Project

General

Profile

« Previous | Next » 

Revision 47863

integrated latest changes from dnet40

View differences:

MongoMDStore.java
1 1
package eu.dnetlib.data.mdstore.modular.mongodb;
2 2

  
3
import java.util.*;
4
import java.util.concurrent.*;
5
import java.util.regex.Pattern;
6

  
3 7
import com.google.common.base.Function;
8
import com.google.common.collect.Iterables;
4 9
import com.google.common.collect.Iterators;
5 10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
6 12
import com.mongodb.BasicDBObject;
7 13
import com.mongodb.DBObject;
8 14
import com.mongodb.QueryBuilder;
......
10 16
import com.mongodb.client.ListIndexesIterable;
11 17
import com.mongodb.client.MongoCollection;
12 18
import com.mongodb.client.MongoDatabase;
19
import com.mongodb.client.model.IndexOptions;
13 20
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
21
import eu.dnetlib.data.mdstore.MDStoreServiceException;
14 22
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
15 23
import eu.dnetlib.data.mdstore.modular.RecordParser;
16 24
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
17 25
import eu.dnetlib.enabling.resultset.ResultSetListener;
18
import eu.dnetlib.miscutils.collections.MappedCollection;
19
import eu.dnetlib.miscutils.functional.UnaryFunction;
20 26
import org.apache.commons.lang.StringUtils;
21 27
import org.apache.commons.logging.Log;
22 28
import org.apache.commons.logging.LogFactory;
23 29
import org.bson.conversions.Bson;
24 30
import org.springframework.beans.factory.annotation.Required;
25 31

  
26
import java.util.ArrayList;
27
import java.util.Iterator;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.concurrent.*;
31
import java.util.regex.Pattern;
32

  
33 32
public class MongoMDStore implements MDStore {
34 33

  
35 34
	private static final int BULK_SIZE = 500;
36 35
	private static final Log log = LogFactory.getLog(MongoMDStore.class);
37
	private static List<String> requiredIndicies = Lists.newArrayList("{ \"id\" : 1}", "{ \"timestamp\" : 1}", "{ \"originalId\" : 1}");
36

  
38 37
	private final boolean discardRecords;
39 38
	private String id;
40 39
	private MongoDatabase mongoDatabase;
......
44 43

  
45 44
	private RecordParser recordParser;
46 45

  
46
	private static List<String> indices = Lists.newArrayList("id", "timestamp", "originalId");
47
	private final IndexOptions options = new IndexOptions().background(true);
48

  
47 49
	public MongoMDStore(final String id,
48 50
			final MongoCollection<DBObject> collection,
49 51
			final RecordParser recordParser,
......
69 71

  
70 72
		ensureIndices();
71 73

  
72
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
74
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
73 75
		final Object sentinel = new Object();
74 76
		int countStored = 0;
75
		final Callable<Integer> writer = new Callable<Integer>() {
76
			@Override
77
			public Integer call() throws Exception {
78
				final MongoBulkWritesManager bulkWritesManager =
79
						new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
80
				int count = 0;
81
				while (true) {
82
					try {
83
						final Object record = queue.take();
84
						if (record == sentinel) {
85
							bulkWritesManager.flushBulks();
86
							break;
87
						}
88
						count++;
89
						bulkWritesManager.insert((String) record);
90
					} catch (final InterruptedException e) {
91
						log.fatal("got exception in background thread", e);
92
						throw new IllegalStateException(e);
77
		final Callable<Integer> writer = () -> {
78
			final MongoBulkWritesManager bulkWritesManager =
79
					new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
80
			int count = 0;
81
			while (true) {
82
				try {
83
					final Object record = queue.take();
84
					if (record == sentinel) {
85
						bulkWritesManager.flushBulks();
86
						break;
93 87
					}
88
					count++;
89
					bulkWritesManager.insert((String) record);
90
				} catch (final InterruptedException e) {
91
					log.fatal("got exception in background thread", e);
92
					throw new IllegalStateException(e);
94 93
				}
95
				log.debug(String.format("extracted %s records from feeder queue", count));
96
				return count;
97 94
			}
95
			log.debug(String.format("extracted %s records from feeder queue", count));
96
			return count;
98 97
		};
99 98
		final ExecutorService executorService = Executors.newSingleThreadExecutor();
100 99
		Future<Integer> storedCountInt = executorService.submit(writer);
......
115 114
			log.error("Error on feeding mdstore with id:" + id, e);
116 115
			throw new IllegalStateException(e);
117 116
		}
118
		// double check
119
		ensureIndices();
120
		collection.createIndex(new BasicDBObject("id", 1));
121 117
		log.info("finished feeding mdstore " + id);
122 118
		return countStored;
123 119
	}
124 120

  
125 121
	public void ensureIndices() {
126
		for (final String key : Lists.newArrayList("id", "timestamp", "originalId")) {
127
			collection.createIndex(new BasicDBObject(key, 1));
122
		for (final String key : indices) {
123
			collection.createIndex(new BasicDBObject(key, 1), options);
128 124
		}
129 125
		if (mdformats != null) {
130 126
			for (final MDFormatDescription description : mdformats) {
131
				collection.createIndex(new BasicDBObject(description.getName(), 1));
127
				collection.createIndex(new BasicDBObject(description.getName(), 1), options);
132 128
			}
133 129
		}
134 130
	}
135 131

  
136 132
	public boolean isIndexed() {
137
		final ListIndexesIterable<DBObject> listIndexesIterable = collection.listIndexes(DBObject.class);
138
		return Lists.newArrayList(new MappedCollection<String, DBObject>(listIndexesIterable, new UnaryFunction<String, DBObject>() {
139

  
140
			@Override
141
			public String evaluate(final DBObject dbo) {
142
				return new BasicDBObject(dbo.toMap()).getString("key");
143
			}
144
		})).containsAll(requiredIndicies);
133
		final ListIndexesIterable<DBObject> found = collection.listIndexes(DBObject.class);
134
		return Sets.newHashSet(Iterables.transform(found, dbo -> {
135
			final Set<String> keyset = ((DBObject) dbo.get("key")).toMap().keySet();
136
			return Iterables.getFirst(keyset, "");
137
		})).containsAll(indices);
145 138
	}
146 139

  
147 140
	/**
......
168 161
	}
169 162

  
170 163
	@Override
171
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) {
164
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
172 165
		return deliver(from, until, recordFilter, new SerializeMongoRecord());
173 166
	}
174 167

  
175 168
	@Override
176
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) {
169
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
177 170
		return deliver(from, until, recordFilter, new SerializeMongoRecordId());
178 171
	}
179 172

  
180
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer) {
181
		ensureIndices();
182

  
173
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer)
174
			throws MDStoreServiceException {
183 175
		final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
184 176

  
185
		return new MongoResultSetListener(collection, filter, serializer);
177
		return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);	}
178

  
179
	private Long parseLong(final String s) throws MDStoreServiceException {
180
		if (StringUtils.isBlank(s)) {
181
			return null;
182
		}
183
		try {
184
			return Long.valueOf(s);
185
		} catch (NumberFormatException e) {
186
			throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
187
		}
186 188
	}
187 189

  
188 190
	@Override
189 191
	public Iterable<String> iterate() {
190
		return new Iterable<String>() {
191

  
192
			@Override
193
			public Iterator<String> iterator() {
194
				return Iterators.transform(collection.find().iterator(), new Function<DBObject, String>() {
195

  
196
					@Override
197
					public String apply(final DBObject arg) {
198
						return (String) arg.get("body");
199
					}
200
				});
201
			}
202
		};
192
		return () -> Iterators.transform(collection.find().iterator(), arg -> (String) arg.get("body"));
203 193
	}
204 194

  
205 195
	@Override

Also available in: Unified diff