Project

General

Profile

« Previous | Next » 

Revision 47863

integrated latest changes from dnet40

View differences:

modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MDStoreTransactionManagerImpl.java
15 15
import com.mongodb.client.MongoDatabase;
16 16
import com.mongodb.client.MongoIterable;
17 17
import com.mongodb.client.model.Filters;
18
import com.mongodb.client.model.IndexOptions;
18 19
import com.mongodb.client.model.UpdateOptions;
19 20
import eu.dnetlib.data.mdstore.MDStoreServiceException;
20 21
import eu.dnetlib.data.mdstore.modular.connector.*;
......
56 57
	/** The expired days. */
57 58
	private int expiredDays;
58 59

  
60
	private final IndexOptions options = new IndexOptions().background(true);
61

  
59 62
	/**
60 63
	 * Bootstrap manager.
61 64
	 */
......
82 85
		final BasicDBObject ensureIndex = new BasicDBObject();
83 86
		ensureIndex.put("mdId", 1);
84 87
		log.debug("Create index in MetadaManager ");
85
		this.getManagerTable().createIndex(ensureIndex);
88
		this.getManagerTable().createIndex(ensureIndex, options);
86 89
	}
87 90

  
88 91
	/**
......
659 662
	private void updateIncremental(final String transactionId, final String currentId) {
660 663
		final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class);
661 664
		final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class);
662
		final FindIterable<DBObject> it = transaction.find();
663
		for (DBObject currentObj : it){
665
		final FindIterable<DBObject> it = transaction.find().noCursorTimeout(true);
666
		for (DBObject currentObj : it) {
664 667

  
665 668
			BasicDBObject newObj = new BasicDBObject();
666 669

  
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoResultSetListener.java
9 9
import com.google.common.collect.Lists;
10 10
import com.mongodb.BasicDBObject;
11 11
import com.mongodb.DBObject;
12
import com.mongodb.QueryBuilder;
13 12
import com.mongodb.client.FindIterable;
14 13
import com.mongodb.client.MongoCollection;
15
import com.mongodb.client.model.Filters;
16 14
import com.mongodb.client.model.Sorts;
17 15
import eu.dnetlib.enabling.resultset.ResultSet;
18 16
import eu.dnetlib.enabling.resultset.ResultSetAware;
......
28 26

  
29 27
	private static final Log log = LogFactory.getLog(MongoResultSetListener.class);
30 28

  
31
	private ConcurrentSizedMap<Integer, String> lastKeys = new ConcurrentSizedMap<Integer, String>();
29
	private ConcurrentSizedMap<Integer, String> lastKeys = new ConcurrentSizedMap<>();
32 30
	private Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("id"));
33 31

  
34 32
	private Function<DBObject, String> serializer;
35
	private Pattern filter;
36 33
	private MongoCollection<DBObject> collection;
34
	private Bson query;
37 35

  
38
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Pattern filter, final Function<DBObject, String> serializer) {
36
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Long from, final Long until, final Pattern filter, final Function<DBObject, String> serializer) {
39 37
		this.collection = collection;
40
		this.filter = filter;
41 38
		this.serializer = serializer;
39
		this.query = query(from, until, filter);
42 40
	}
43 41

  
44 42
	@Override
......
66 64
	}
67 65

  
68 66
	private ArrayList<DBObject> fetchNew(final int from, final int size) {
69
		FindIterable<DBObject> it = null;
70
		if (filter != null) {
71
			Bson query = Filters.regex("body", filter);
72
			it = collection.find(query);
73
		} else
74
			it = collection.find();
75

  
67
		final FindIterable<DBObject> it = collection.find(query).batchSize(size);
76 68
		return Lists.newArrayList(it.sort(sortByIdAsc).skip(from).limit(size));
77 69
	}
78 70

  
......
80 72
		if (log.isDebugEnabled()) {
81 73
			log.debug("trying to continue from previous key: " + lastKey);
82 74
		}
83
		Bson filterQuery = gt("id", lastKey);
84
		if (filter != null) {
85
			filterQuery = and(filterQuery, regex("body", filter));
86
		}
87
		final FindIterable<DBObject> it = collection.find(filterQuery).sort(sortByIdAsc).limit(size);
75
		final Bson q = and(query, gt("id", lastKey));
76
		final FindIterable<DBObject> it = collection.find(q).batchSize(size).sort(sortByIdAsc).limit(size);
88 77
		return Lists.newArrayList(it);
89 78
	}
90 79

  
80
	private Bson query(final Long from, final Long until, final Pattern pattern) {
81
		final Bson dateFilter = dateQuery(from, until);
82
		final Bson regexFilter = regexQuery(pattern);
83
		if (dateFilter != null & regexFilter != null) {
84
			return and(dateFilter, regexFilter);
85
		} else if (dateFilter != null) {
86
			return dateFilter;
87
		} else if (regexFilter != null) {
88
			return regexFilter;
89
		}
90
		return new BasicDBObject();
91
	}
92

  
93
	private Bson dateQuery(final Long from, final Long until) {
94
		if (from != null & until != null) {
95
			return and(gt("timestamp", from), lt("timestamp", until));
96
		}
97
		if (from != null) {
98
			return gt("timestamp", from);
99
		}
100
		if (until != null) {
101
			return lt("timestamp", until);
102
		}
103
		return null;
104
	}
105

  
106
	private Bson regexQuery(final Pattern pattern) {
107
		if (pattern != null) {
108
			return regex("body", pattern);
109
		}
110
		return null;
111
	}
112

  
91 113
	@Override
92 114
	public int getSize() {
93
		if (filter != null) {
94
			BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(filter).get();
95
			return (int) collection.count(query);
96
		}
97
		return (int) collection.count();
115
		return (int) collection.count(query);
98 116
	}
99 117

  
100 118
	@Override
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoMDStore.java
1 1
package eu.dnetlib.data.mdstore.modular.mongodb;
2 2

  
3
import java.util.*;
4
import java.util.concurrent.*;
5
import java.util.regex.Pattern;
6

  
3 7
import com.google.common.base.Function;
8
import com.google.common.collect.Iterables;
4 9
import com.google.common.collect.Iterators;
5 10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
6 12
import com.mongodb.BasicDBObject;
7 13
import com.mongodb.DBObject;
8 14
import com.mongodb.QueryBuilder;
......
10 16
import com.mongodb.client.ListIndexesIterable;
11 17
import com.mongodb.client.MongoCollection;
12 18
import com.mongodb.client.MongoDatabase;
19
import com.mongodb.client.model.IndexOptions;
13 20
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
21
import eu.dnetlib.data.mdstore.MDStoreServiceException;
14 22
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
15 23
import eu.dnetlib.data.mdstore.modular.RecordParser;
16 24
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
17 25
import eu.dnetlib.enabling.resultset.ResultSetListener;
18
import eu.dnetlib.miscutils.collections.MappedCollection;
19
import eu.dnetlib.miscutils.functional.UnaryFunction;
20 26
import org.apache.commons.lang.StringUtils;
21 27
import org.apache.commons.logging.Log;
22 28
import org.apache.commons.logging.LogFactory;
23 29
import org.bson.conversions.Bson;
24 30
import org.springframework.beans.factory.annotation.Required;
25 31

  
26
import java.util.ArrayList;
27
import java.util.Iterator;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.concurrent.*;
31
import java.util.regex.Pattern;
32

  
33 32
public class MongoMDStore implements MDStore {
34 33

  
35 34
	private static final int BULK_SIZE = 500;
36 35
	private static final Log log = LogFactory.getLog(MongoMDStore.class);
37
	private static List<String> requiredIndicies = Lists.newArrayList("{ \"id\" : 1}", "{ \"timestamp\" : 1}", "{ \"originalId\" : 1}");
36

  
38 37
	private final boolean discardRecords;
39 38
	private String id;
40 39
	private MongoDatabase mongoDatabase;
......
44 43

  
45 44
	private RecordParser recordParser;
46 45

  
46
	private static List<String> indices = Lists.newArrayList("id", "timestamp", "originalId");
47
	private final IndexOptions options = new IndexOptions().background(true);
48

  
47 49
	public MongoMDStore(final String id,
48 50
			final MongoCollection<DBObject> collection,
49 51
			final RecordParser recordParser,
......
69 71

  
70 72
		ensureIndices();
71 73

  
72
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(100);
74
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
73 75
		final Object sentinel = new Object();
74 76
		int countStored = 0;
75
		final Callable<Integer> writer = new Callable<Integer>() {
76
			@Override
77
			public Integer call() throws Exception {
78
				final MongoBulkWritesManager bulkWritesManager =
79
						new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
80
				int count = 0;
81
				while (true) {
82
					try {
83
						final Object record = queue.take();
84
						if (record == sentinel) {
85
							bulkWritesManager.flushBulks();
86
							break;
87
						}
88
						count++;
89
						bulkWritesManager.insert((String) record);
90
					} catch (final InterruptedException e) {
91
						log.fatal("got exception in background thread", e);
92
						throw new IllegalStateException(e);
77
		final Callable<Integer> writer = () -> {
78
			final MongoBulkWritesManager bulkWritesManager =
79
					new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
80
			int count = 0;
81
			while (true) {
82
				try {
83
					final Object record = queue.take();
84
					if (record == sentinel) {
85
						bulkWritesManager.flushBulks();
86
						break;
93 87
					}
88
					count++;
89
					bulkWritesManager.insert((String) record);
90
				} catch (final InterruptedException e) {
91
					log.fatal("got exception in background thread", e);
92
					throw new IllegalStateException(e);
94 93
				}
95
				log.debug(String.format("extracted %s records from feeder queue", count));
96
				return count;
97 94
			}
95
			log.debug(String.format("extracted %s records from feeder queue", count));
96
			return count;
98 97
		};
99 98
		final ExecutorService executorService = Executors.newSingleThreadExecutor();
100 99
		Future<Integer> storedCountInt = executorService.submit(writer);
......
115 114
			log.error("Error on feeding mdstore with id:" + id, e);
116 115
			throw new IllegalStateException(e);
117 116
		}
118
		// double check
119
		ensureIndices();
120
		collection.createIndex(new BasicDBObject("id", 1));
121 117
		log.info("finished feeding mdstore " + id);
122 118
		return countStored;
123 119
	}
124 120

  
125 121
	public void ensureIndices() {
126
		for (final String key : Lists.newArrayList("id", "timestamp", "originalId")) {
127
			collection.createIndex(new BasicDBObject(key, 1));
122
		for (final String key : indices) {
123
			collection.createIndex(new BasicDBObject(key, 1), options);
128 124
		}
129 125
		if (mdformats != null) {
130 126
			for (final MDFormatDescription description : mdformats) {
131
				collection.createIndex(new BasicDBObject(description.getName(), 1));
127
				collection.createIndex(new BasicDBObject(description.getName(), 1), options);
132 128
			}
133 129
		}
134 130
	}
135 131

  
136 132
	public boolean isIndexed() {
137
		final ListIndexesIterable<DBObject> listIndexesIterable = collection.listIndexes(DBObject.class);
138
		return Lists.newArrayList(new MappedCollection<String, DBObject>(listIndexesIterable, new UnaryFunction<String, DBObject>() {
139

  
140
			@Override
141
			public String evaluate(final DBObject dbo) {
142
				return new BasicDBObject(dbo.toMap()).getString("key");
143
			}
144
		})).containsAll(requiredIndicies);
133
		final ListIndexesIterable<DBObject> found = collection.listIndexes(DBObject.class);
134
		return Sets.newHashSet(Iterables.transform(found, dbo -> {
135
			final Set<String> keyset = ((DBObject) dbo.get("key")).toMap().keySet();
136
			return Iterables.getFirst(keyset, "");
137
		})).containsAll(indices);
145 138
	}
146 139

  
147 140
	/**
......
168 161
	}
169 162

  
170 163
	@Override
171
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) {
164
	public ResultSetListener deliver(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
172 165
		return deliver(from, until, recordFilter, new SerializeMongoRecord());
173 166
	}
174 167

  
175 168
	@Override
176
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) {
169
	public ResultSetListener deliverIds(final String from, final String until, final String recordFilter) throws MDStoreServiceException {
177 170
		return deliver(from, until, recordFilter, new SerializeMongoRecordId());
178 171
	}
179 172

  
180
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer) {
181
		ensureIndices();
182

  
173
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer)
174
			throws MDStoreServiceException {
183 175
		final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
184 176

  
185
		return new MongoResultSetListener(collection, filter, serializer);
177
		return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);	}
178

  
179
	private Long parseLong(final String s) throws MDStoreServiceException {
180
		if (StringUtils.isBlank(s)) {
181
			return null;
182
		}
183
		try {
184
			return Long.valueOf(s);
185
		} catch (NumberFormatException e) {
186
			throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
187
		}
186 188
	}
187 189

  
188 190
	@Override
189 191
	public Iterable<String> iterate() {
190
		return new Iterable<String>() {
191

  
192
			@Override
193
			public Iterator<String> iterator() {
194
				return Iterators.transform(collection.find().iterator(), new Function<DBObject, String>() {
195

  
196
					@Override
197
					public String apply(final DBObject arg) {
198
						return (String) arg.get("body");
199
					}
200
				});
201
			}
202
		};
192
		return () -> Iterators.transform(collection.find().iterator(), arg -> (String) arg.get("body"));
203 193
	}
204 194

  
205 195
	@Override

Also available in: Unified diff