38 |
38 |
private static List<String> indexes = Arrays.asList(ID, TIMESTAMP, ORIGINAL_ID);
|
39 |
39 |
private final boolean discardRecords;
|
40 |
40 |
private String id;
|
41 |
|
private MongoDatabase mongoDatabase;
|
42 |
|
private MongoCollection<MDStoreRecord> collection;
|
|
41 |
private MongoCollection<MDStoreRecord> recordCollection;
|
43 |
42 |
private MongoCollection<MDStoreRecord> discardedCollection;
|
44 |
43 |
|
45 |
44 |
public MongoMDStore(final String id,
|
... | ... | |
47 |
46 |
final boolean discardRecords,
|
48 |
47 |
final MongoDatabase mongoDatabase) {
|
49 |
48 |
this.id = id;
|
50 |
|
this.mongoDatabase = mongoDatabase;
|
51 |
|
this.collection = collection;
|
52 |
|
this.discardedCollection = this.mongoDatabase.getCollection(DISCARDED_PREFIX + StringUtils.substringBefore(id, "_"), MDStoreRecord.class);
|
|
49 |
this.recordCollection = collection;
|
|
50 |
this.discardedCollection = mongoDatabase.getCollection(DISCARDED_PREFIX + StringUtils.substringBefore(id, "_"), MDStoreRecord.class);
|
53 |
51 |
this.discardRecords = discardRecords;
|
54 |
52 |
}
|
55 |
53 |
|
56 |
54 |
@Override
|
57 |
55 |
public int feed(final Stream<MDStoreRecord> records) {
|
58 |
|
ensureIndices();
|
59 |
|
|
60 |
56 |
final BlockingQueue<MDStoreRecord> queue = new ArrayBlockingQueue<>(BULK_SIZE);
|
61 |
57 |
final MDStoreRecord sentinel = new MDStoreRecord();
|
62 |
58 |
int countStored = 0;
|
63 |
59 |
final Callable<Integer> writer = () -> {
|
64 |
60 |
final MongoBulkWritesManager bulkWritesManager =
|
65 |
|
new MongoBulkWritesManager(collection, discardedCollection, BULK_SIZE, discardRecords);
|
|
61 |
new MongoBulkWritesManager(recordCollection, discardedCollection, BULK_SIZE, discardRecords);
|
66 |
62 |
int count = 0;
|
67 |
63 |
while (true) {
|
68 |
64 |
try {
|
... | ... | |
102 |
98 |
}
|
103 |
99 |
// double check
|
104 |
100 |
ensureIndices();
|
105 |
|
collection.createIndex(new BasicDBObject("id", 1));
|
|
101 |
|
106 |
102 |
log.info("finished feeding mdstore " + id);
|
107 |
103 |
return countStored;
|
108 |
104 |
}
|
109 |
105 |
|
110 |
106 |
public void ensureIndices() {
|
111 |
|
indexes.forEach(key -> collection.createIndex(new BasicDBObject(key, 1)));
|
|
107 |
indexes.forEach(key -> recordCollection.createIndex(new BasicDBObject(key, 1)));
|
112 |
108 |
}
|
113 |
109 |
|
114 |
110 |
public boolean isIndexed() {
|
115 |
|
final ListIndexesIterable<DBObject> listIndexes = collection.listIndexes(DBObject.class);
|
|
111 |
final ListIndexesIterable<DBObject> listIndexes = recordCollection.listIndexes(DBObject.class);
|
116 |
112 |
final List<String> key = StreamUtils.createStreamFromIterator(listIndexes.iterator())
|
117 |
113 |
.map(dbo -> new BasicDBObject(dbo.toMap()).getString("key"))
|
118 |
114 |
.collect(Collectors.toList());
|
... | ... | |
199 |
195 |
}
|
200 |
196 |
|
201 |
197 |
private Stream<MDStoreRecord> query(final Bson query) {
|
202 |
|
final MongoCursor<MDStoreRecord> mongoCursor = collection.find(query).batchSize(BULK_SIZE).iterator();
|
|
198 |
final MongoCursor<MDStoreRecord> mongoCursor = recordCollection.find(query).batchSize(BULK_SIZE).iterator();
|
203 |
199 |
return DnetStreamSupport.generateStreamFromIterator(mongoCursor);
|
204 |
200 |
}
|
205 |
201 |
|
206 |
202 |
@Override
|
207 |
203 |
public void deleteRecord(final String recordId) {
|
208 |
|
collection.deleteOne(new BasicDBObject("id", recordId));
|
|
204 |
recordCollection.deleteOne(new BasicDBObject("id", recordId));
|
209 |
205 |
}
|
210 |
206 |
|
211 |
207 |
@Override
|
... | ... | |
225 |
221 |
|
226 |
222 |
@Override
|
227 |
223 |
public void truncate() {
|
228 |
|
collection.drop();
|
|
224 |
recordCollection.drop();
|
229 |
225 |
discardedCollection.drop();
|
230 |
226 |
}
|
231 |
227 |
|
232 |
|
public DBObject getMDStoreMetadata() {
|
233 |
|
return mongoDatabase.getCollection("metadata", DBObject.class).find(new BasicDBObject("mdId", getId())).first();
|
234 |
|
}
|
235 |
|
|
236 |
228 |
@Override
|
237 |
|
public String getFormat() {
|
238 |
|
return (String) getMDStoreMetadata().get("format");
|
239 |
|
}
|
240 |
|
|
241 |
|
@Override
|
242 |
|
public String getInterpretation() {
|
243 |
|
return (String) getMDStoreMetadata().get("interpretation");
|
244 |
|
}
|
245 |
|
|
246 |
|
@Override
|
247 |
|
public String getLayout() {
|
248 |
|
return (String) getMDStoreMetadata().get("layout");
|
249 |
|
}
|
250 |
|
|
251 |
|
@Override
|
252 |
229 |
public String getId() {
|
253 |
230 |
return id;
|
254 |
231 |
}
|
... | ... | |
257 |
234 |
this.id = id;
|
258 |
235 |
}
|
259 |
236 |
|
260 |
|
public MongoCollection<MDStoreRecord> getCollection() {
|
261 |
|
return collection;
|
|
237 |
public MongoCollection<MDStoreRecord> getRecordCollection() {
|
|
238 |
return recordCollection;
|
262 |
239 |
}
|
263 |
240 |
|
264 |
|
public void setCollection(final MongoCollection<MDStoreRecord> collection) {
|
265 |
|
this.collection = collection;
|
|
241 |
public void setRecordCollection(final MongoCollection<MDStoreRecord> recordCollection) {
|
|
242 |
this.recordCollection = recordCollection;
|
266 |
243 |
}
|
267 |
244 |
|
268 |
245 |
@Override
|
269 |
246 |
public int getSize() {
|
270 |
|
return (int) collection.count();
|
|
247 |
return (int) recordCollection.count();
|
271 |
248 |
}
|
272 |
249 |
|
273 |
250 |
public MongoCollection<MDStoreRecord> getDiscardedCollection() {
|
simplifying