Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import com.google.common.collect.Lists;
7
import com.mongodb.BasicDBObject;
8
import com.mongodb.DBObject;
9
import com.mongodb.WriteConcern;
10
import com.mongodb.client.MongoCollection;
11
import com.mongodb.client.model.*;
12
import eu.dnetlib.data.mdstore.MDStoreServiceException;
13
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
14
import eu.dnetlib.data.mdstore.modular.RecordParser;
15
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParser;
16
import eu.dnetlib.data.mdstore.modular.mongodb.utils.IndexFieldRecordParserException;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19

    
20
public class MongoBulkWritesManager {
21

    
22
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
23
	private final boolean discardRecords;
24
	private final boolean indexRecords;
25
	private final IndexFieldRecordParser indexFieldRecordParser = new IndexFieldRecordParser();
26
	private final List<MDFormatDescription> mdref;
27
	private RecordParser recordParser;
28
	private MongoCollection<DBObject> validCollection;
29
	private List<WriteModel<DBObject>> validBulkOperationList;
30
	private BulkWriteOptions writeOptions;
31
	private MongoCollection<DBObject> discardedCollection;
32
	private List<WriteModel<DBObject>> discardedBulkOperationList;
33
	private int bulkSize;
34

    
35
	public MongoBulkWritesManager(final MongoCollection<DBObject> collection,
36
			final MongoCollection<DBObject> discardedCollection,
37
			final List<MDFormatDescription> mdref,
38
			final int bulkSize,
39
			final RecordParser parser,
40
			final boolean discardRecords) {
41
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
42
		this.validBulkOperationList = Lists.newArrayList();
43

    
44
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
45
		this.discardedBulkOperationList = Lists.newArrayList();
46

    
47
		this.bulkSize = bulkSize;
48
		this.recordParser = parser;
49
		this.discardRecords = discardRecords;
50
		this.mdref = mdref;
51

    
52
		this.indexRecords = this.mdref != null;
53
		this.writeOptions = new BulkWriteOptions().ordered(false);
54
	}
55

    
56
	public void insert(final String record) throws MDStoreServiceException {
57
		Map<String, String> recordProperties = null;
58
		try {
59
			recordProperties = recordParser.parseRecord(record);
60
		} catch (Throwable e) {
61
			if (discardRecords) {
62
				log.debug("unhandled exception: " + e.getMessage());
63
				discardRecord(record);
64
			}
65
		}
66
		Map<String, List<String>> indexRecordField = null;
67
		try {
68
			if (indexRecords) {
69
				indexRecordField = indexFieldRecordParser.parseRecord(record, mdref);
70
			}
71
		} catch (IndexFieldRecordParserException e) {
72
			// could not index record fields
73
			throw new MDStoreServiceException("Are you using the correct type of store / index definition for the records in " + validCollection.getNamespace() + " ?", e);
74
		}
75

    
76
		log.debug("found props: " + recordProperties);
77
		if (recordProperties.containsKey("id")) {
78
			final DBObject obj = buildDBObject(record, recordProperties, indexRecordField);
79
			if (log.isDebugEnabled()) {
80
				log.debug("Saving object" + obj);
81
			}
82
			validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true)));
83
			if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
84
				validCollection.bulkWrite(validBulkOperationList, writeOptions);
85
				validBulkOperationList.clear();
86
			}
87
		} else {
88
			if (discardRecords) {
89
				log.debug("parsed record seems invalid");
90
				discardRecord(record);
91
			}
92
		}
93
	}
94

    
95
	private void discardRecord(final String record) {
96
		discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject("body", record)));
97

    
98
		if (((discardedBulkOperationList.size() % bulkSize) == 0) && !discardedBulkOperationList.isEmpty()) {
99
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
100
			discardedBulkOperationList.clear();
101
		}
102
	}
103

    
104
	public void flushBulks() {
105
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
106
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
107
		if (!validBulkOperationList.isEmpty()) {
108
			validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
109
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
110
		}
111
		if (!discardedBulkOperationList.isEmpty()) {
112
			discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.JOURNALED);
113
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
114
		}
115
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
116
		validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
117
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
118
	}
119

    
120
	private DBObject buildDBObject(final String record, final Map<String, String> recordProperties, final Map<String, List<String>> indexFieldProperties) {
121
		final DBObject obj = new BasicDBObject();
122
		obj.put("id", recordProperties.get("id"));
123
		obj.put("originalId", recordProperties.get("originalId"));
124
		obj.put("body", record);
125
		obj.put("timestamp", System.currentTimeMillis());
126
		if (indexFieldProperties != null)
127
			obj.putAll(indexFieldProperties);
128
		return obj;
129
	}
130

    
131
	private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) {
132
		return collection.withWriteConcern(writeConcern);
133
	}
134

    
135
}
(3-3/5)