Project

General

Profile

« Previous | Next » 

Revision 46419

simplifying

View differences:

modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/mongo/MongoMDStore.java
38 38
	private static List<String> indexes = Arrays.asList(ID, TIMESTAMP, ORIGINAL_ID);
39 39
	private final boolean discardRecords;
40 40
	private String id;
41
	private MongoDatabase mongoDatabase;
42
	private MongoCollection<MDStoreRecord> collection;
41
	private MongoCollection<MDStoreRecord> recordCollection;
43 42
	private MongoCollection<MDStoreRecord> discardedCollection;
44 43

  
45 44
	public MongoMDStore(final String id,
......
47 46
			final boolean discardRecords,
48 47
			final MongoDatabase mongoDatabase) {
49 48
		this.id = id;
50
		this.mongoDatabase = mongoDatabase;
51
		this.collection = collection;
52
		this.discardedCollection = this.mongoDatabase.getCollection(DISCARDED_PREFIX + StringUtils.substringBefore(id, "_"), MDStoreRecord.class);
49
		this.recordCollection = collection;
50
		this.discardedCollection = mongoDatabase.getCollection(DISCARDED_PREFIX + StringUtils.substringBefore(id, "_"), MDStoreRecord.class);
53 51
		this.discardRecords = discardRecords;
54 52
	}
55 53

  
56 54
	@Override
57 55
	public int feed(final Stream<MDStoreRecord> records) {
58
		ensureIndices();
59

  
60 56
		final BlockingQueue<MDStoreRecord> queue = new ArrayBlockingQueue<>(BULK_SIZE);
61 57
		final MDStoreRecord sentinel = new MDStoreRecord();
62 58
		int countStored = 0;
63 59
		final Callable<Integer> writer = () -> {
64 60
			final MongoBulkWritesManager bulkWritesManager =
65
					new MongoBulkWritesManager(collection, discardedCollection, BULK_SIZE, discardRecords);
61
					new MongoBulkWritesManager(recordCollection, discardedCollection, BULK_SIZE, discardRecords);
66 62
			int count = 0;
67 63
			while (true) {
68 64
				try {
......
102 98
		}
103 99
		// double check
104 100
		ensureIndices();
105
		collection.createIndex(new BasicDBObject("id", 1));
101

  
106 102
		log.info("finished feeding mdstore " + id);
107 103
		return countStored;
108 104
	}
109 105

  
110 106
	public void ensureIndices() {
111
		indexes.forEach(key -> collection.createIndex(new BasicDBObject(key, 1)));
107
		indexes.forEach(key -> recordCollection.createIndex(new BasicDBObject(key, 1)));
112 108
	}
113 109

  
114 110
	public boolean isIndexed() {
115
		final ListIndexesIterable<DBObject> listIndexes = collection.listIndexes(DBObject.class);
111
		final ListIndexesIterable<DBObject> listIndexes = recordCollection.listIndexes(DBObject.class);
116 112
		final List<String> key = StreamUtils.createStreamFromIterator(listIndexes.iterator())
117 113
				.map(dbo -> new BasicDBObject(dbo.toMap()).getString("key"))
118 114
				.collect(Collectors.toList());
......
199 195
	}
200 196

  
201 197
	private Stream<MDStoreRecord> query(final Bson query) {
202
		final MongoCursor<MDStoreRecord> mongoCursor = collection.find(query).batchSize(BULK_SIZE).iterator();
198
		final MongoCursor<MDStoreRecord> mongoCursor = recordCollection.find(query).batchSize(BULK_SIZE).iterator();
203 199
		return DnetStreamSupport.generateStreamFromIterator(mongoCursor);
204 200
	}
205 201

  
206 202
	@Override
207 203
	public void deleteRecord(final String recordId) {
208
		collection.deleteOne(new BasicDBObject("id", recordId));
204
		recordCollection.deleteOne(new BasicDBObject("id", recordId));
209 205
	}
210 206

  
211 207
	@Override
......
225 221

  
226 222
	@Override
227 223
	public void truncate() {
228
		collection.drop();
224
		recordCollection.drop();
229 225
		discardedCollection.drop();
230 226
	}
231 227

  
232
	public DBObject getMDStoreMetadata() {
233
		return mongoDatabase.getCollection("metadata", DBObject.class).find(new BasicDBObject("mdId", getId())).first();
234
	}
235

  
236 228
	@Override
237
	public String getFormat() {
238
		return (String) getMDStoreMetadata().get("format");
239
	}
240

  
241
	@Override
242
	public String getInterpretation() {
243
		return (String) getMDStoreMetadata().get("interpretation");
244
	}
245

  
246
	@Override
247
	public String getLayout() {
248
		return (String) getMDStoreMetadata().get("layout");
249
	}
250

  
251
	@Override
252 229
	public String getId() {
253 230
		return id;
254 231
	}
......
257 234
		this.id = id;
258 235
	}
259 236

  
260
	public MongoCollection<MDStoreRecord> getCollection() {
261
		return collection;
237
	public MongoCollection<MDStoreRecord> getRecordCollection() {
238
		return recordCollection;
262 239
	}
263 240

  
264
	public void setCollection(final MongoCollection<MDStoreRecord> collection) {
265
		this.collection = collection;
241
	public void setRecordCollection(final MongoCollection<MDStoreRecord> recordCollection) {
242
		this.recordCollection = recordCollection;
266 243
	}
267 244

  
268 245
	@Override
269 246
	public int getSize() {
270
		return (int) collection.count();
247
		return (int) recordCollection.count();
271 248
	}
272 249

  
273 250
	public MongoCollection<MDStoreRecord> getDiscardedCollection() {
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/MDStore.java
8 8

  
9 9
	String getId() throws MDStoreException;
10 10

  
11
	String getFormat() throws MDStoreException;
12

  
13
	String getInterpretation() throws MDStoreException;
14

  
15
	String getLayout() throws MDStoreException;
16

  
17 11
	void truncate() throws MDStoreException;
18 12

  
19 13
	int feed(Stream<MDStoreRecord> records) throws MDStoreException;

Also available in: Unified diff