Project

General

Profile

1 33875 andrea.man
package eu.dnetlib.data.mdstore.modular.mongodb;
2
3 41551 alessia.ba
import java.util.List;
4 33875 andrea.man
import java.util.Map;
5
6 41551 alessia.ba
import com.google.common.collect.Lists;
7 33875 andrea.man
import com.mongodb.BasicDBObject;
8
import com.mongodb.DBObject;
9
import com.mongodb.WriteConcern;
10 41551 alessia.ba
import com.mongodb.client.MongoCollection;
11
import com.mongodb.client.MongoDatabase;
12
import com.mongodb.client.model.*;
13 33875 andrea.man
import eu.dnetlib.data.mdstore.modular.RecordParser;
14 41551 alessia.ba
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16 33875 andrea.man
17
public class MongoBulkWritesManager {
18
19
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
20 41551 alessia.ba
	private final boolean discardRecords;
21 33875 andrea.man
	private RecordParser recordParser;
22 41551 alessia.ba
	private MongoCollection validCollection;
23
	private List<WriteModel> validBulkOperationList;
24 33875 andrea.man
	private int validOpCounter;
25 41551 alessia.ba
	private BulkWriteOptions writeOptions;
26
	private MongoCollection discardedCollection;
27
	private List<WriteModel> discardedBulkOperationList;
28 33875 andrea.man
	private int discardedOpCounter;
29
	private int bulkSize;
30
31 41551 alessia.ba
	public MongoBulkWritesManager(final MongoCollection collection, final MongoCollection discardedCollection, final int bulkSize, final RecordParser parser,
32
			final boolean discardRecords) {
33
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
34
		this.validBulkOperationList = Lists.newArrayList();
35 33875 andrea.man
		this.validOpCounter = 0;
36 41551 alessia.ba
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
37
		this.discardedBulkOperationList = Lists.newArrayList();
38 33875 andrea.man
		this.discardedOpCounter = 0;
39
		this.bulkSize = bulkSize;
40
		this.recordParser = parser;
41
		this.discardRecords = discardRecords;
42 41551 alessia.ba
		this.writeOptions = new BulkWriteOptions().ordered(false);
43 33875 andrea.man
	}
44
45
	public void insert(final String record) {
46
		try {
47
			final Map<String, String> recordProperties = recordParser.parseRecord(record);
48
			log.debug("found props: " + recordProperties);
49
			if (recordProperties.containsKey("id")) {
50 38852 claudio.at
				final DBObject obj = buildDBObject(record, recordProperties);
51 37807 michele.ar
				if (log.isDebugEnabled()) {
52
					log.debug("Saving object" + obj);
53
				}
54 41551 alessia.ba
				validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true)));
55 33875 andrea.man
				validOpCounter++;
56 38852 claudio.at
				if (((validOpCounter % bulkSize) == 0) && (validOpCounter != 0)) {
57 41551 alessia.ba
					validCollection.bulkWrite(validBulkOperationList, writeOptions);
58
					validBulkOperationList = Lists.newArrayList();
59 33875 andrea.man
				}
60
			} else {
61
				if (discardRecords) {
62
					log.debug("parsed record seems invalid");
63 41551 alessia.ba
					discardRecord(record);
64 33875 andrea.man
				}
65
			}
66
		} catch (Throwable e) {
67
			if (discardRecords) {
68
				log.debug("unhandled exception: " + e.getMessage());
69 41551 alessia.ba
				discardRecord(record);
70 33875 andrea.man
			}
71
		}
72
	}
73
74 41551 alessia.ba
	private void discardRecord(final String record) {
75
		discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject("body", record)));
76
		discardedOpCounter++;
77
		if (((discardedOpCounter % bulkSize) == 0) && (discardedOpCounter != 0)) {
78
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
79
			discardedBulkOperationList = Lists.newArrayList();
80
		}
81
	}
82
83
	public void flushBulks(MongoDatabase mongoDatabase) {
84 41578 alessia.ba
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
85
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
86 33875 andrea.man
		if (validOpCounter != 0) {
87 41578 alessia.ba
			validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
88 41551 alessia.ba
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
89 33875 andrea.man
		}
90
		if (discardedOpCounter != 0) {
91 41578 alessia.ba
			discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.JOURNALED);
92 41551 alessia.ba
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
93 33875 andrea.man
		}
94 41578 alessia.ba
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
95
		validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
96
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
97 33875 andrea.man
	}
98
99 38852 claudio.at
	private DBObject buildDBObject(final String record, final Map<String, String> recordProperties) {
100
		final DBObject obj = new BasicDBObject();
101
		obj.put("id", recordProperties.get("id"));
102
		obj.put("originalId", recordProperties.get("originalId"));
103
		obj.put("body", record);
104
		obj.put("timestamp", System.currentTimeMillis());
105
		return obj;
106
	}
107
108 41578 alessia.ba
	private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) {
109
		return collection.withWriteConcern(writeConcern);
110
	}
111
112 33875 andrea.man
}