1
|
package eu.dnetlib.data.mdstore.mongo;
|
2
|
|
3
|
import java.util.Arrays;
|
4
|
import java.util.List;
|
5
|
import java.util.Map;
|
6
|
import java.util.Set;
|
7
|
import java.util.stream.Collectors;
|
8
|
import java.util.stream.Stream;
|
9
|
import javax.annotation.PostConstruct;
|
10
|
|
11
|
import com.google.common.collect.Lists;
|
12
|
import com.google.common.collect.Sets;
|
13
|
import com.mongodb.BasicDBObject;
|
14
|
import com.mongodb.WriteConcern;
|
15
|
import com.mongodb.client.MongoCollection;
|
16
|
import com.mongodb.client.MongoDatabase;
|
17
|
import com.mongodb.client.model.IndexOptions;
|
18
|
import com.mongodb.client.model.UpdateOptions;
|
19
|
import eu.dnetlib.data.mdstore.MDStore;
|
20
|
import eu.dnetlib.data.mdstore.MDStoreService;
|
21
|
import eu.dnetlib.data.mdstore.model.MDStoreRecord;
|
22
|
import eu.dnetlib.data.mdstore.model.Metadata;
|
23
|
import eu.dnetlib.data.mdstore.model.Transaction;
|
24
|
import eu.dnetlib.data.mdstore.repository.MetadataRepository;
|
25
|
import eu.dnetlib.data.mdstore.utils.MDStoreException;
|
26
|
import eu.dnetlib.data.mdstore.utils.MDStoreProfileManager;
|
27
|
import eu.dnetlib.data.mdstore.utils.MDStoreRuntimeException;
|
28
|
import eu.dnetlib.data.mdstore.utils.RecordParser;
|
29
|
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
|
30
|
import org.apache.commons.lang3.StringUtils;
|
31
|
import org.apache.commons.logging.Log;
|
32
|
import org.apache.commons.logging.LogFactory;
|
33
|
import org.springframework.beans.factory.annotation.Autowired;
|
34
|
import org.springframework.beans.factory.annotation.Value;
|
35
|
import org.springframework.stereotype.Component;
|
36
|
|
37
|
import static com.mongodb.client.model.Filters.eq;
|
38
|
import static eu.dnetlib.data.mdstore.utils.MDStoreConstants.DISCARDED_PREFIX;
|
39
|
import static eu.dnetlib.data.mdstore.utils.MDStoreConstants.ID;
|
40
|
|
41
|
/**
|
42
|
* Created by claudio on 24/03/2017.
|
43
|
*/
|
44
|
@Component
|
45
|
public class MongoMDStoreService implements MDStoreService {
|
46
|
|
47
|
private static final Log log = LogFactory.getLog(MongoMDStoreService.class);
|
48
|
|
49
|
/**
|
50
|
* The max number of concurrent transactions per mdstore.
|
51
|
*/
|
52
|
private int maxTransactions = 5;
|
53
|
|
54
|
/**
|
55
|
* The expired days.
|
56
|
*/
|
57
|
private int expiredDays = 3;
|
58
|
|
59
|
@Autowired
|
60
|
private MDStoreProfileManager profileManager;
|
61
|
|
62
|
@Autowired
|
63
|
private MetadataRepository metadata;
|
64
|
|
65
|
@Autowired
|
66
|
private MongoDatabase mongoDatabase;
|
67
|
|
68
|
@Value("#{'${msro.worker.mdstore.skip.prefixes}'.split(',')}")
|
69
|
private List<String> skipPrefixes = Lists.newArrayList("resolved", "metadata");
|
70
|
|
71
|
/**
|
72
|
* Bootstrap manager.
|
73
|
*/
|
74
|
@PostConstruct
|
75
|
private void init() {
|
76
|
log.info("Bootstrap mdstore ...");
|
77
|
|
78
|
//drop all the stale expiring and the transactions mdstores
|
79
|
metadata.findAll().forEach(d -> {
|
80
|
metadata.bootstrap(d.getMdId());
|
81
|
log.debug(String.format("Added %s to Metadata Manager data structure", d.getMdId()));
|
82
|
});
|
83
|
|
84
|
final Set<String> metadataMdIds = Sets.newHashSet(metadata.listMdIds());
|
85
|
|
86
|
//delete from metadataManager all the mdId(s) that are not in metadata
|
87
|
Sets.difference(
|
88
|
Sets.newHashSet(metadata.listMdIds()),
|
89
|
metadataMdIds).forEach(mdId -> metadata.delete(mdId));
|
90
|
|
91
|
//delete all the collections that are not referred by metadata.currentId(s)
|
92
|
Sets.difference(
|
93
|
Sets.newHashSet(metadata.listCurrentIds()),
|
94
|
metadataMdIds).forEach(name -> {
|
95
|
log.info(String.format("dropping collection %s", name));
|
96
|
mongoDatabase.getCollection(name).drop();
|
97
|
});
|
98
|
|
99
|
log.info("Bootstrap mdstore complete!");
|
100
|
}
|
101
|
|
102
|
@Override
|
103
|
public String create(final String format, final String layout, final String interpretation) throws MDStoreException {
|
104
|
log.debug("Creating new mdstore");
|
105
|
|
106
|
final String mdId = getProfileManager().registerProfile(format, layout, interpretation);
|
107
|
metadata.save(Metadata.create()
|
108
|
.setMdId(mdId)
|
109
|
.setCurrentId(mdId)
|
110
|
.setFormat(format)
|
111
|
.setLayout(layout)
|
112
|
.setInterpretation(interpretation));
|
113
|
mongoDatabase.createCollection(mdId);
|
114
|
((MongoMDStore) getMDStore(mdId)).ensureIndices();
|
115
|
|
116
|
log.info("Created new mdstore: " + mdId);
|
117
|
return mdId;
|
118
|
}
|
119
|
|
120
|
@Override
|
121
|
public void delete(final String mdId) throws MDStoreException {
|
122
|
if (!metadata.exists(mdId)) {
|
123
|
throw new MDStoreException(String.format("cannot find %s in transaction manager table", mdId));
|
124
|
}
|
125
|
metadata.delete(mdId);
|
126
|
getMDStore(mdId).truncate();
|
127
|
getProfileManager().deleteProfile(mdId);
|
128
|
log.info("deleted mdId: " + mdId);
|
129
|
}
|
130
|
|
131
|
@Override
|
132
|
public Stream<MDStoreRecord> stream(final String mdId) throws MDStoreException {
|
133
|
return stream(mdId, null, null);
|
134
|
}
|
135
|
|
136
|
@Override
|
137
|
public Stream<MDStoreRecord> stream(final String mdId, final String from, final String until) throws MDStoreException {
|
138
|
return stream(mdId, from, until, null);
|
139
|
}
|
140
|
|
141
|
@Override
|
142
|
public Stream<MDStoreRecord> stream(final String mdId, final String from, final String until, final String recordFilter) throws MDStoreException {
|
143
|
return getMDStore(mdId).deliver(from, until, recordFilter);
|
144
|
}
|
145
|
|
146
|
@Override
|
147
|
public Stream<MDStoreRecord> bulkStream(final String format, final String layout, final String interpretation) throws MDStoreException {
|
148
|
return list(format, layout, interpretation).stream()
|
149
|
.map(mdId -> {
|
150
|
log.debug("bulk deliver of mdId: " + mdId);
|
151
|
try {
|
152
|
return getMDStore(mdId).deliver();
|
153
|
} catch (MDStoreException e) {
|
154
|
throw new MDStoreRuntimeException(e);
|
155
|
}
|
156
|
}).reduce(Stream::concat).orElseGet(Stream::empty);
|
157
|
}
|
158
|
|
159
|
@Override
|
160
|
public MDStoreRecord findOne(final String mdId, final String recordId) throws MDStoreException {
|
161
|
return getMDStore(mdId).getRecord(recordId);
|
162
|
}
|
163
|
|
164
|
@Override
|
165
|
public List<String> list() throws MDStoreException {
|
166
|
return DnetStreamSupport.stream(metadata.findAll().iterator())
|
167
|
.map(m -> m.getMdId())
|
168
|
.collect(Collectors.toList());
|
169
|
}
|
170
|
|
171
|
@Override
|
172
|
public List<String> list(final String format, final String layout, final String interpretation) throws MDStoreException {
|
173
|
return metadata.findByFormatAndLayoutAndInterpretation(format, layout, interpretation).stream()
|
174
|
.map(m -> m.getMdId())
|
175
|
.collect(Collectors.toList());
|
176
|
}
|
177
|
|
178
|
@Override
|
179
|
public long size(final String mdId) throws MDStoreException {
|
180
|
return getMDStore(mdId).getSize();
|
181
|
}
|
182
|
|
183
|
@Override
|
184
|
public long size(final String format, final String layout, final String interpretation) {
|
185
|
return metadata.sumOfSize(format, layout, interpretation);
|
186
|
}
|
187
|
|
188
|
@Override
|
189
|
public void store(final String mdId, final Stream<String> records, final FeedMode feedMode) throws MDStoreException {
|
190
|
final boolean refresh = FeedMode.REFRESH.equals(feedMode);
|
191
|
final String txId = startTransaction(mdId, refresh);
|
192
|
final MDStore mdstore = getMDStore(mdId, txId);
|
193
|
try {
|
194
|
if (refresh) {
|
195
|
mdstore.truncate();
|
196
|
}
|
197
|
mdstore.feed(records.map(new RecordParser()));
|
198
|
|
199
|
final Metadata m = metadata.findByMdId(mdId);
|
200
|
|
201
|
final Map<String, Transaction> txs = m.getTransactionMap();
|
202
|
if (!txs.containsKey(txId)) {
|
203
|
throw new MDStoreException("Error, unable to find transaction with Id " + txId);
|
204
|
}
|
205
|
|
206
|
if (txs.get(txId).getRefresh()) {
|
207
|
if (!m.isCurrentIdExpired(expiredDays)) {
|
208
|
doDrop(m.getCurrentId());
|
209
|
}
|
210
|
m.setCurrentId(txId);
|
211
|
|
212
|
log.debug(String.format("commit refresh: replaced collection %s with %s", mdId, txId));
|
213
|
} else {
|
214
|
updateIncremental(txId, m.getCurrentId());
|
215
|
doDrop(txId);
|
216
|
log.debug(String.format("commit incremental: "));
|
217
|
}
|
218
|
final long size = getRecordCollection(m.getCurrentId()).count();
|
219
|
metadata.commit(m.setSize(size), txId);
|
220
|
getProfileManager().updateSize(mdId, size);
|
221
|
log.info("Finished feeding mdstore " + mdId + " - new size: " + size);
|
222
|
} catch (Throwable e) {
|
223
|
if (txId != null) {
|
224
|
log.error("Dropping transaction : " + txId);
|
225
|
metadata.dropTransaction(mdId, txId);
|
226
|
}
|
227
|
throw new MDStoreException("Error feeding mdstore: " + mdId, e);
|
228
|
}
|
229
|
}
|
230
|
|
231
|
/// helpers
|
232
|
private String startTransaction(final String mdId, final boolean refresh) throws MDStoreException {
|
233
|
log.info("Start transaction for metadata store " + mdId);
|
234
|
|
235
|
final Metadata m = metadata.findByMdId(mdId);
|
236
|
|
237
|
if (m.getTransactions().size() > getMaxTransactions()) {
|
238
|
throw new MDStoreException(
|
239
|
String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), m.getTransactions().size(), mdId));
|
240
|
}
|
241
|
|
242
|
final String txId = mdId + "::" + System.currentTimeMillis();
|
243
|
metadata.addTransaction(mdId,
|
244
|
Transaction.create()
|
245
|
.setId(txId)
|
246
|
.setRefresh(refresh));
|
247
|
|
248
|
return txId;
|
249
|
}
|
250
|
|
251
|
/**
|
252
|
* Update incremental.
|
253
|
*
|
254
|
* @param transactionId the transaction id
|
255
|
* @param currentId the current id
|
256
|
*/
|
257
|
private void updateIncremental(final String transactionId, final String currentId) {
|
258
|
final MongoCollection<MDStoreRecord> source = mongoDatabase.getCollection(transactionId, MDStoreRecord.class);
|
259
|
final MongoCollection<MDStoreRecord> destination = mongoDatabase.getCollection(currentId, MDStoreRecord.class)
|
260
|
.withWriteConcern(WriteConcern.JOURNALED);
|
261
|
|
262
|
//TODO reimplement using bulk writes on the destination collection
|
263
|
for (MDStoreRecord record : source.find().batchSize(100).noCursorTimeout(true)) {
|
264
|
|
265
|
final MDStoreRecord r = record.clone();
|
266
|
if (StringUtils.isNotBlank(r.getId())) {
|
267
|
destination.replaceOne(eq(ID, r.getId()), r, new UpdateOptions().upsert(true));
|
268
|
}
|
269
|
}
|
270
|
}
|
271
|
|
272
|
private MDStore getMDStore(final String mdId, final String currentId) {
|
273
|
return new MongoMDStore(
|
274
|
mdId,
|
275
|
getRecordCollection(currentId),
|
276
|
getRecordCollection(DISCARDED_PREFIX + currentId),
|
277
|
metadata);
|
278
|
}
|
279
|
|
280
|
private MDStore getMDStore(final String id) {
|
281
|
final Metadata m = metadata.findByMdId(id);
|
282
|
return getMDStore(id, m.getCurrentId());
|
283
|
}
|
284
|
|
285
|
private void doDrop(final String mdId) {
|
286
|
mongoDatabase.getCollection(mdId).drop();
|
287
|
mongoDatabase.getCollection(DISCARDED_PREFIX + mdId).drop();
|
288
|
log.debug("deleted collection " + mdId);
|
289
|
}
|
290
|
|
291
|
private MongoCollection<MDStoreRecord> getRecordCollection(final String collectionId) {
|
292
|
return mongoDatabase.getCollection(collectionId, MDStoreRecord.class);
|
293
|
}
|
294
|
|
295
|
private void createIndex(final String fieldName, final MongoCollection<?> collection) {
|
296
|
Arrays.asList(collection).forEach(c -> {
|
297
|
log.info(String.format("Create index in %s", c.getNamespace().getCollectionName()));
|
298
|
c.createIndex(
|
299
|
new BasicDBObject(fieldName, 1),
|
300
|
new IndexOptions().background(true));
|
301
|
});
|
302
|
}
|
303
|
|
304
|
public int getMaxTransactions() {
|
305
|
return maxTransactions;
|
306
|
}
|
307
|
|
308
|
public void setMaxTransactions(final int maxTransactions) {
|
309
|
this.maxTransactions = maxTransactions;
|
310
|
}
|
311
|
|
312
|
public int getExpiredDays() {
|
313
|
return expiredDays;
|
314
|
}
|
315
|
|
316
|
public void setExpiredDays(final int expiredDays) {
|
317
|
this.expiredDays = expiredDays;
|
318
|
}
|
319
|
|
320
|
public MDStoreProfileManager getProfileManager() {
|
321
|
return profileManager;
|
322
|
}
|
323
|
|
324
|
public void setProfileManager(final MDStoreProfileManager profileManager) {
|
325
|
this.profileManager = profileManager;
|
326
|
}
|
327
|
}
|