1
|
package eu.dnetlib.data.mdstore.modular.newmongodb;
|
2
|
|
3
|
|
4
|
import com.google.common.collect.Lists;
|
5
|
import com.mongodb.BasicDBObject;
|
6
|
import com.mongodb.WriteConcern;
|
7
|
import com.mongodb.client.MongoCollection;
|
8
|
import com.mongodb.client.model.*;
|
9
|
import eu.dnetlib.data.mdstore.modular.newmongodb.model.MDStoreRecord;
|
10
|
import org.apache.commons.lang3.StringUtils;
|
11
|
import org.apache.commons.logging.Log;
|
12
|
import org.apache.commons.logging.LogFactory;
|
13
|
import org.bson.Document;
|
14
|
|
15
|
import java.util.List;
|
16
|
import static eu.dnetlib.data.mdstore.modular.newmongodb.model.MDStoreEncoder.*;
|
17
|
import static eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreConstants.*;
|
18
|
/**
|
19
|
* Created by sandro on 5/23/17.
|
20
|
*/
|
21
|
public class MongoBulkWritesManager {
|
22
|
|
23
|
private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
|
24
|
private MongoCollection<Document> collection;
|
25
|
private List<WriteModel<Document>> validBulkOpList;
|
26
|
|
27
|
private BulkWriteOptions writeOptions;
|
28
|
private MongoCollection<Document> discarded;
|
29
|
private List<WriteModel<Document>> discardedBulkOpList;
|
30
|
|
31
|
private int bulkSize;
|
32
|
|
33
|
public MongoBulkWritesManager(
|
34
|
final MongoCollection<Document> collection,
|
35
|
final MongoCollection<Document> discarded,
|
36
|
final int bulkSize) {
|
37
|
this.collection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
38
|
this.validBulkOpList = Lists.newArrayList();
|
39
|
|
40
|
this.discarded = discarded.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
41
|
this.discardedBulkOpList = Lists.newArrayList();
|
42
|
|
43
|
this.bulkSize = bulkSize;
|
44
|
this.writeOptions = new BulkWriteOptions().ordered(false);
|
45
|
}
|
46
|
|
47
|
public void insert(final MDStoreRecord r) {
|
48
|
try {
|
49
|
if (StringUtils.isNotBlank(r.getRecordIdentifier())) {
|
50
|
|
51
|
if (log.isDebugEnabled()) {
|
52
|
log.debug("Saving object" + r.getRecordIdentifier());
|
53
|
}
|
54
|
new InsertOneModel<>(encodeMDStoreRecord(r));
|
55
|
validBulkOpList.add(new InsertOneModel<>(encodeMDStoreRecord(r)));
|
56
|
if (((validBulkOpList.size() % bulkSize) == 0) && !validBulkOpList.isEmpty()) {
|
57
|
collection.bulkWrite(validBulkOpList, writeOptions);
|
58
|
validBulkOpList.clear();
|
59
|
}
|
60
|
} else {
|
61
|
log.debug("parsed record seems invalid");
|
62
|
discardRecord(r);
|
63
|
}
|
64
|
} catch (Throwable e) {
|
65
|
log.debug("unhandled exception: " + e.getMessage());
|
66
|
discardRecord(r);
|
67
|
}
|
68
|
}
|
69
|
|
70
|
private void discardRecord(final MDStoreRecord record) {
|
71
|
try {
|
72
|
final Document result = new Document();
|
73
|
result.put(BODY, record.getBody());
|
74
|
discardedBulkOpList.add(new InsertOneModel<>(result ));
|
75
|
if (((discardedBulkOpList.size() % bulkSize) == 0) && !discardedBulkOpList.isEmpty()) {
|
76
|
discarded.bulkWrite(discardedBulkOpList, writeOptions);
|
77
|
discardedBulkOpList.clear();
|
78
|
}
|
79
|
} catch (Throwable e) {
|
80
|
log.error("Error on discard Record", e);
|
81
|
}
|
82
|
}
|
83
|
|
84
|
public void flushBulks() {
|
85
|
//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
|
86
|
//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
|
87
|
if (!validBulkOpList.isEmpty()) {
|
88
|
collection = getCollectionWithWriteConcern(collection, WriteConcern.JOURNALED);
|
89
|
collection.bulkWrite(validBulkOpList, writeOptions);
|
90
|
}
|
91
|
if (!discardedBulkOpList.isEmpty()) {
|
92
|
discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
|
93
|
discarded.bulkWrite(discardedBulkOpList, writeOptions);
|
94
|
}
|
95
|
//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
|
96
|
collection = getCollectionWithWriteConcern(collection, WriteConcern.ACKNOWLEDGED);
|
97
|
discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
|
98
|
}
|
99
|
|
100
|
private MongoCollection<Document> getCollectionWithWriteConcern(MongoCollection<Document> collection, WriteConcern writeConcern) {
|
101
|
return collection.withWriteConcern(writeConcern);
|
102
|
}
|
103
|
}
|