Revision 46517
Added by Claudio Atzori about 7 years ago
MongoBulkWritesManager.java | ||
---|---|---|
12 | 12 |
import org.apache.commons.logging.Log; |
13 | 13 |
import org.apache.commons.logging.LogFactory; |
14 | 14 |
|
15 |
import static eu.dnetlib.data.mdstore.MDStoreConstants.*; |
|
15 |
import static eu.dnetlib.data.mdstore.utils.MDStoreConstants.*;
|
|
16 | 16 |
|
17 | 17 |
public class MongoBulkWritesManager { |
18 | 18 |
|
19 | 19 |
private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class); |
20 |
private final boolean discardRecords; |
|
21 |
private MongoCollection<MDStoreRecord> validCollection; |
|
22 |
private List<WriteModel<MDStoreRecord>> validBulkOperationList; |
|
20 |
private MongoCollection<MDStoreRecord> collection; |
|
21 |
private List<WriteModel<MDStoreRecord>> validBulkOpList; |
|
23 | 22 |
|
24 | 23 |
private BulkWriteOptions writeOptions; |
25 |
private MongoCollection<MDStoreRecord> discardedCollection;
|
|
26 |
private List<WriteModel<MDStoreRecord>> discardedBulkOperationList;
|
|
24 |
private MongoCollection<MDStoreRecord> discarded; |
|
25 |
private List<WriteModel<MDStoreRecord>> discardedBulkOpList; |
|
27 | 26 |
|
28 | 27 |
private int bulkSize; |
29 | 28 |
|
30 |
public MongoBulkWritesManager(final MongoCollection<MDStoreRecord> collection,
|
|
31 |
final MongoCollection<MDStoreRecord> discardedCollection,
|
|
32 |
final int bulkSize,
|
|
33 |
final boolean discardRecords) {
|
|
34 |
this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
|
35 |
this.validBulkOperationList = Lists.newArrayList();
|
|
29 |
public MongoBulkWritesManager( |
|
30 |
final MongoCollection<MDStoreRecord> collection,
|
|
31 |
final MongoCollection<MDStoreRecord> discarded,
|
|
32 |
final int bulkSize) {
|
|
33 |
this.collection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
|
34 |
this.validBulkOpList = Lists.newArrayList(); |
|
36 | 35 |
|
37 |
this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
|
38 |
this.discardedBulkOperationList = Lists.newArrayList();
|
|
36 |
this.discarded = discarded.withWriteConcern(WriteConcern.ACKNOWLEDGED);
|
|
37 |
this.discardedBulkOpList = Lists.newArrayList(); |
|
39 | 38 |
|
40 | 39 |
this.bulkSize = bulkSize; |
41 |
this.discardRecords = discardRecords; |
|
42 | 40 |
this.writeOptions = new BulkWriteOptions().ordered(false); |
43 | 41 |
} |
44 | 42 |
|
... | ... | |
49 | 47 |
if (log.isDebugEnabled()) { |
50 | 48 |
log.debug("Saving object" + r.getId()); |
51 | 49 |
} |
52 |
validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject(ID, r.getId()), r, new UpdateOptions().upsert(true)));
|
|
53 |
if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
|
|
54 |
validCollection.bulkWrite(validBulkOperationList, writeOptions);
|
|
55 |
validBulkOperationList.clear();
|
|
50 |
validBulkOpList.add(new ReplaceOneModel(new BasicDBObject(ID, r.getId()), r, new UpdateOptions().upsert(true))); |
|
51 |
if (((validBulkOpList.size() % bulkSize) == 0) && !validBulkOpList.isEmpty()) {
|
|
52 |
collection.bulkWrite(validBulkOpList, writeOptions);
|
|
53 |
validBulkOpList.clear(); |
|
56 | 54 |
} |
57 | 55 |
} else { |
58 |
if (discardRecords) { |
|
59 |
log.debug("parsed record seems invalid"); |
|
60 |
discardRecord(r); |
|
61 |
} |
|
56 |
log.debug("parsed record seems invalid"); |
|
57 |
discardRecord(r); |
|
62 | 58 |
} |
63 | 59 |
} catch (Throwable e) { |
64 |
if (discardRecords) { |
|
65 |
log.debug("unhandled exception: " + e.getMessage()); |
|
66 |
discardRecord(r); |
|
67 |
} |
|
60 |
log.debug("unhandled exception: " + e.getMessage()); |
|
61 |
discardRecord(r); |
|
68 | 62 |
} |
69 | 63 |
} |
70 | 64 |
|
71 | 65 |
private void discardRecord(final MDStoreRecord MDStoreRecord) { |
72 |
discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject(BODY, MDStoreRecord)));
|
|
66 |
discardedBulkOpList.add(new InsertOneModel(new BasicDBObject(BODY, MDStoreRecord))); |
|
73 | 67 |
|
74 |
if (((discardedBulkOperationList.size() % bulkSize) == 0) && !discardedBulkOperationList.isEmpty()) {
|
|
75 |
discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
|
|
76 |
discardedBulkOperationList.clear();
|
|
68 |
if (((discardedBulkOpList.size() % bulkSize) == 0) && !discardedBulkOpList.isEmpty()) {
|
|
69 |
discarded.bulkWrite(discardedBulkOpList, writeOptions);
|
|
70 |
discardedBulkOpList.clear(); |
|
77 | 71 |
} |
78 | 72 |
} |
79 | 73 |
|
80 | 74 |
public void flushBulks() { |
81 | 75 |
//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk) |
82 | 76 |
//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.' |
83 |
if (!validBulkOperationList.isEmpty()) {
|
|
84 |
validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
|
|
85 |
validCollection.bulkWrite(validBulkOperationList, writeOptions);
|
|
77 |
if (!validBulkOpList.isEmpty()) { |
|
78 |
collection = getCollectionWithWriteConcern(collection, WriteConcern.JOURNALED);
|
|
79 |
collection.bulkWrite(validBulkOpList, writeOptions);
|
|
86 | 80 |
} |
87 |
if (!discardedBulkOperationList.isEmpty()) {
|
|
88 |
discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
|
|
89 |
discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
|
|
81 |
if (!discardedBulkOpList.isEmpty()) { |
|
82 |
discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
|
|
83 |
discarded.bulkWrite(discardedBulkOpList, writeOptions);
|
|
90 | 84 |
} |
91 | 85 |
//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode |
92 |
validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
|
|
93 |
discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
|
|
86 |
collection = getCollectionWithWriteConcern(collection, WriteConcern.ACKNOWLEDGED);
|
|
87 |
discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
|
|
94 | 88 |
} |
95 | 89 |
|
96 | 90 |
private MongoCollection<MDStoreRecord> getCollectionWithWriteConcern(MongoCollection<MDStoreRecord> collection, WriteConcern writeConcern) { |
Also available in: Unified diff
metadata and transactionInfo managed using spring data repository