Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.List;
4
import java.util.Map;
5
import javax.xml.stream.XMLStreamException;
6

    
7
import com.google.common.collect.Lists;
8
import com.mongodb.BasicDBObject;
9
import com.mongodb.DBObject;
10
import com.mongodb.WriteConcern;
11
import com.mongodb.bulk.BulkWriteResult;
12
import com.mongodb.client.MongoCollection;
13
import com.mongodb.client.model.BulkWriteOptions;
14
import com.mongodb.client.model.ReplaceOneModel;
15
import com.mongodb.client.model.UpdateOptions;
16
import com.mongodb.client.model.WriteModel;
17
import eu.dnetlib.data.mdstore.modular.RecordParser;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.bson.BsonSerializationException;
21

    
22
public class MongoBulkWritesManager {
23

    
24
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
25
	private final boolean discardRecords;
26
	private RecordParser recordParser;
27
	private MongoCollection<DBObject> validCollection;
28
	private List<WriteModel<DBObject>> validBulkOperationList;
29
	private BulkWriteOptions writeOptions;
30
	private MongoCollection<DBObject> discardedCollection;
31
	private List<WriteModel<DBObject>> discardedBulkOperationList;
32
	private int discardedOpCounter;
33
	private int bulkSize;
34

    
35
	public MongoBulkWritesManager(final MongoCollection<DBObject> collection,
36
			final MongoCollection<DBObject> discardedCollection,
37
			final int bulkSize,
38
			final RecordParser parser,
39
			final boolean discardRecords) {
40
		this.validCollection = collection.withWriteConcern(WriteConcern.JOURNALED);
41
		this.validBulkOperationList = Lists.newArrayList();
42
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
43
		this.discardedBulkOperationList = Lists.newArrayList();
44
		this.discardedOpCounter = 0;
45
		this.bulkSize = bulkSize;
46
		this.recordParser = parser;
47
		this.discardRecords = discardRecords;
48
		this.writeOptions = new BulkWriteOptions().ordered(false);
49
	}
50

    
51
	public void insert(final String record) {
52
		try {
53
			final Map<String, String> recordProperties = recordParser.parseRecord(record);
54
			//log.debug("found props: " + recordProperties);
55
			if (recordProperties.containsKey("id")) {
56
				if(log.isDebugEnabled()){
57
					log.debug("Parsed record id "+recordProperties.get("id"));
58
				}
59
				final DBObject obj = buildDBObject(record, recordProperties);
60
//				if (log.isDebugEnabled()) {
61
//					log.debug("Saving object" + obj);
62
//				}
63
				validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject("id", obj.get("id")), obj, new UpdateOptions().upsert(true)));
64
				if (((validBulkOperationList.size() % bulkSize) == 0) && (!validBulkOperationList.isEmpty())) {
65
					log.debug("Bulk writing #records: "+validBulkOperationList.size());
66
					BulkWriteResult res = validCollection.bulkWrite(validBulkOperationList, writeOptions);
67
					if(log.isDebugEnabled()) {
68
						log.debug("Inserted: " + res.getInsertedCount());
69
						if (res.isModifiedCountAvailable()) log.debug("Modified: " + res.getModifiedCount());
70
					}
71
					validBulkOperationList = Lists.newArrayList();
72
				}
73
			} else {
74
				if (discardRecords) {
75
					log.debug("parsed record seems invalid");
76
					discardRecord(record);
77
				}
78
			}
79
		} catch(BsonSerializationException e){
80
			log.error("Probably too big XML record: " + e.getMessage(), e);
81
			log.error("Dropping operation list -- #ops "+validBulkOperationList.size());
82
			validBulkOperationList = Lists.newArrayList();
83
		} catch(XMLStreamException e) {
84
			if (discardRecords) {
85
				log.error("unhandled exception: " + e.getMessage(), e);
86
				discardRecord(record);
87
			}
88
		}
89
	}
90

    
91
	private void discardRecord(final String record) {
92
		discardedCollection.insertOne(new BasicDBObject("body", record));
93
	}
94

    
95
	public void flushBulks() {
96
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
97
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
98
		if (!validBulkOperationList.isEmpty()) {
99
		//	validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
100
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
101
		}
102

    
103
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
104
		//validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
105
		//discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
106
	}
107

    
108
	private DBObject buildDBObject(final String record, final Map<String, String> recordProperties) {
109
		final DBObject obj = new BasicDBObject();
110
		obj.put("id", recordProperties.get("id"));
111
		obj.put("originalId", recordProperties.get("originalId"));
112
		obj.put("body", record);
113
		obj.put("timestamp", System.currentTimeMillis());
114
		return obj;
115
	}
116

    
117
	private MongoCollection<DBObject> getCollectionWithWriteConcern(MongoCollection<DBObject> collection, WriteConcern writeConcern) {
118
		return collection.withWriteConcern(writeConcern);
119
	}
120

    
121
}
(3-3/6)