Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import com.google.common.collect.Lists;
7
import com.mongodb.BasicDBObject;
8
import com.mongodb.DBObject;
9
import com.mongodb.WriteConcern;
10
import com.mongodb.client.MongoCollection;
11
import com.mongodb.client.MongoDatabase;
12
import com.mongodb.client.model.*;
13
import eu.dnetlib.data.mdstore.modular.RecordParser;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16

    
17
public class MongoBulkWritesManager {
18

    
19
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
20
	private final boolean discardRecords;
21
	private RecordParser recordParser;
22
	private MongoCollection validCollection;
23
	private List<WriteModel> validBulkOperationList;
24
	private int validOpCounter;
25
	private BulkWriteOptions writeOptions;
26
	private MongoCollection discardedCollection;
27
	private List<WriteModel> discardedBulkOperationList;
28
	private int discardedOpCounter;
29
	private int bulkSize;
30

    
31
	public MongoBulkWritesManager(final MongoCollection collection, final MongoCollection discardedCollection, final int bulkSize, final RecordParser parser,
32
			final boolean discardRecords) {
33
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
34
		this.validBulkOperationList = Lists.newArrayList();
35
		this.validOpCounter = 0;
36
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
37
		this.discardedBulkOperationList = Lists.newArrayList();
38
		this.discardedOpCounter = 0;
39
		this.bulkSize = bulkSize;
40
		this.recordParser = parser;
41
		this.discardRecords = discardRecords;
42
		this.writeOptions = new BulkWriteOptions().ordered(false);
43
	}
44

    
45
	public void insert(final String record) {
46
		try {
47
			final Map<String, String> recordProperties = recordParser.parseRecord(record);
48
			log.debug("found props: " + recordProperties);
49
			if (recordProperties.containsKey("id")) {
50
				final DBObject obj = buildDBObject(record, recordProperties);
51
				if (log.isDebugEnabled()) {
52
					log.debug("Saving object" + obj);
53
				}
54
				validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true)));
55
				validOpCounter++;
56
				if (((validOpCounter % bulkSize) == 0) && (validOpCounter != 0)) {
57
					validCollection.bulkWrite(validBulkOperationList, writeOptions);
58
					validBulkOperationList = Lists.newArrayList();
59
				}
60
			} else {
61
				if (discardRecords) {
62
					log.debug("parsed record seems invalid");
63
					discardRecord(record);
64
				}
65
			}
66
		} catch (Throwable e) {
67
			if (discardRecords) {
68
				log.debug("unhandled exception: " + e.getMessage());
69
				discardRecord(record);
70
			}
71
		}
72
	}
73

    
74
	private void discardRecord(final String record) {
75
		discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject("body", record)));
76
		discardedOpCounter++;
77
		if (((discardedOpCounter % bulkSize) == 0) && (discardedOpCounter != 0)) {
78
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
79
			discardedBulkOperationList = Lists.newArrayList();
80
		}
81
	}
82

    
83
	public void flushBulks(MongoDatabase mongoDatabase) {
84
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
85
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
86
		if (validOpCounter != 0) {
87
			validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
88
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
89
		}
90
		if (discardedOpCounter != 0) {
91
			discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.JOURNALED);
92
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
93
		}
94
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
95
		validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
96
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
97
	}
98

    
99
	private DBObject buildDBObject(final String record, final Map<String, String> recordProperties) {
100
		final DBObject obj = new BasicDBObject();
101
		obj.put("id", recordProperties.get("id"));
102
		obj.put("originalId", recordProperties.get("originalId"));
103
		obj.put("body", record);
104
		obj.put("timestamp", System.currentTimeMillis());
105
		return obj;
106
	}
107

    
108
	private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) {
109
		return collection.withWriteConcern(writeConcern);
110
	}
111

    
112
}
(3-3/5)