Revision 47517
Added by Alessia Bardi almost 7 years ago
MongoBulkWritesManager.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mdstore.modular.mongodb; |
2 | 2 |
|
3 |
import java.util.List; |
|
4 |
import java.util.Map; |
|
5 |
|
|
3 | 6 |
import com.google.common.collect.Lists; |
4 | 7 |
import com.mongodb.BasicDBObject; |
5 | 8 |
import com.mongodb.DBObject; |
6 | 9 |
import com.mongodb.WriteConcern; |
7 | 10 |
import com.mongodb.client.MongoCollection; |
8 | 11 |
import com.mongodb.client.model.*; |
12 |
import eu.dnetlib.data.mdstore.MDStoreServiceException; |
|
9 | 13 |
import eu.dnetlib.data.mdstore.modular.MDFormatDescription; |
10 | 14 |
import eu.dnetlib.data.mdstore.modular.RecordParser; |
11 | 15 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser; |
16 |
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException; |
|
12 | 17 |
import org.apache.commons.logging.Log; |
13 | 18 |
import org.apache.commons.logging.LogFactory; |
14 | 19 |
|
15 |
import java.util.List; |
|
16 |
import java.util.Map; |
|
17 |
|
|
18 | 20 |
public class MongoBulkWritesManager { |
19 | 21 |
|
20 | 22 |
private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class); |
21 | 23 |
private final boolean discardRecords; |
22 |
private final boolean indexRecords;
|
|
23 |
private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
|
|
24 |
private final List<MDFormatDescription> mdref;
|
|
25 |
private RecordParser recordParser;
|
|
24 |
private final boolean indexRecords;
|
|
25 |
private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
|
|
26 |
private final List<MDFormatDescription> mdref;
|
|
27 |
private RecordParser recordParser;
|
|
26 | 28 |
private MongoCollection<DBObject> validCollection; |
27 | 29 |
private List<WriteModel<DBObject>> validBulkOperationList; |
28 | 30 |
private BulkWriteOptions writeOptions; |
... | ... | |
31 | 33 |
private int bulkSize; |
32 | 34 |
|
33 | 35 |
public MongoBulkWritesManager(final MongoCollection<DBObject> collection, |
34 |
final MongoCollection<DBObject> discardedCollection,
|
|
35 |
final List<MDFormatDescription> mdref,
|
|
36 |
final int bulkSize,
|
|
37 |
final RecordParser parser,
|
|
38 |
final boolean discardRecords) {
|
|
36 |
final MongoCollection<DBObject> discardedCollection,
|
|
37 |
final List<MDFormatDescription> mdref,
|
|
38 |
final int bulkSize,
|
|
39 |
final RecordParser parser,
|
|
40 |
final boolean discardRecords) {
|
|
39 | 41 |
this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED); |
40 | 42 |
this.validBulkOperationList = Lists.newArrayList(); |
41 | 43 |
|
... | ... | |
45 | 47 |
this.bulkSize = bulkSize; |
46 | 48 |
this.recordParser = parser; |
47 | 49 |
this.discardRecords = discardRecords; |
48 |
this.mdref = mdref;
|
|
50 |
this.mdref = mdref;
|
|
49 | 51 |
|
50 |
this.indexRecords = this.mdref != null;
|
|
51 |
this.writeOptions = new BulkWriteOptions().ordered(false);
|
|
52 |
this.indexRecords = this.mdref != null;
|
|
53 |
this.writeOptions = new BulkWriteOptions().ordered(false);
|
|
52 | 54 |
} |
53 | 55 |
|
54 |
public void insert(final String record) { |
|
56 |
public void insert(final String record) throws MDStoreServiceException { |
|
57 |
Map<String, String> recordProperties = null; |
|
55 | 58 |
try { |
56 |
final Map<String, String> recordProperties = recordParser.parseRecord(record); |
|
57 |
Map<String, List<String>> indexRecordField = null; |
|
58 |
if (indexRecords) { |
|
59 |
indexRecordField = indexFieldRecordParser.parseRecord(record, mdref); |
|
60 |
} |
|
61 |
|
|
62 |
log.debug("found props: " + recordProperties); |
|
63 |
if (recordProperties.containsKey("id")) { |
|
64 |
final DBObject obj = buildDBObject(record, recordProperties, indexRecordField); |
|
65 |
if (log.isDebugEnabled()) { |
|
66 |
log.debug("Saving object" + obj); |
|
67 |
} |
|
68 |
validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true))); |
|
69 |
if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) { |
|
70 |
validCollection.bulkWrite(validBulkOperationList, writeOptions); |
|
71 |
validBulkOperationList.clear(); |
|
72 |
} |
|
73 |
} else { |
|
74 |
if (discardRecords) { |
|
75 |
log.debug("parsed record seems invalid"); |
|
76 |
discardRecord(record); |
|
77 |
} |
|
78 |
} |
|
59 |
recordProperties = recordParser.parseRecord(record); |
|
79 | 60 |
} catch (Throwable e) { |
80 | 61 |
if (discardRecords) { |
81 | 62 |
log.debug("unhandled exception: " + e.getMessage()); |
82 | 63 |
discardRecord(record); |
83 | 64 |
} |
84 | 65 |
} |
66 |
Map<String, List<String>> indexRecordField = null; |
|
67 |
try { |
|
68 |
if (indexRecords) { |
|
69 |
indexRecordField = indexFieldRecordParser.parseRecord(record, mdref); |
|
70 |
} |
|
71 |
} catch (IndexFieldRecordParserException e) { |
|
72 |
// could not index record fields |
|
73 |
throw new MDStoreServiceException("Are you using the correct type of store / index definition for the records in " + validCollection.getNamespace() + " ?", e); |
|
74 |
} |
|
75 |
|
|
76 |
log.debug("found props: " + recordProperties); |
|
77 |
if (recordProperties.containsKey("id")) { |
|
78 |
final DBObject obj = buildDBObject(record, recordProperties, indexRecordField); |
|
79 |
if (log.isDebugEnabled()) { |
|
80 |
log.debug("Saving object" + obj); |
|
81 |
} |
|
82 |
validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true))); |
|
83 |
if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) { |
|
84 |
validCollection.bulkWrite(validBulkOperationList, writeOptions); |
|
85 |
validBulkOperationList.clear(); |
|
86 |
} |
|
87 |
} else { |
|
88 |
if (discardRecords) { |
|
89 |
log.debug("parsed record seems invalid"); |
|
90 |
discardRecord(record); |
|
91 |
} |
|
92 |
} |
|
85 | 93 |
} |
86 | 94 |
|
87 | 95 |
private void discardRecord(final String record) { |
... | ... | |
109 | 117 |
discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED); |
110 | 118 |
} |
111 | 119 |
|
112 |
private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
|
|
113 |
final DBObject obj = new BasicDBObject();
|
|
120 |
private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
|
|
121 |
final DBObject obj = new BasicDBObject();
|
|
114 | 122 |
obj.put("id", recordProperties.get("id")); |
115 | 123 |
obj.put("originalId", recordProperties.get("originalId")); |
116 | 124 |
obj.put("body", record); |
117 | 125 |
obj.put("timestamp", System.currentTimeMillis()); |
118 |
if (indexFieldProperties != null)
|
|
119 |
obj.putAll(indexFieldProperties);
|
|
120 |
return obj;
|
|
126 |
if (indexFieldProperties != null)
|
|
127 |
obj.putAll(indexFieldProperties);
|
|
128 |
return obj;
|
|
121 | 129 |
} |
122 | 130 |
|
123 | 131 |
private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) { |
Also available in: Unified diff
Stroing in mdstore fails if the VTD Parser cannot properly apply the xpaths on records to index