Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

    
3
import java.util.Map;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import com.mongodb.BasicDBObject;
9
import com.mongodb.BulkUpdateRequestBuilder;
10
import com.mongodb.BulkWriteOperation;
11
import com.mongodb.BulkWriteRequestBuilder;
12
import com.mongodb.DBCollection;
13
import com.mongodb.DBObject;
14
import com.mongodb.WriteConcern;
15

    
16
import eu.dnetlib.data.mdstore.modular.RecordParser;
17

    
18
public class MongoBulkWritesManager {
19

    
20
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
21

    
22
	private RecordParser recordParser;
23

    
24
	private DBCollection validCollection;
25
	private BulkWriteOperation validBulk;
26
	private int validOpCounter;
27

    
28
	private DBCollection discardedCollection;
29
	private BulkWriteOperation discardedBulk;
30
	private int discardedOpCounter;
31
	private int bulkSize;
32

    
33
	private final boolean discardRecords;
34

    
35
	public MongoBulkWritesManager(final DBCollection collection, final DBCollection discardedCollection, final int bulkSize, final RecordParser parser,
36
			final boolean discardRecords) {
37
		this.validCollection = collection;
38
		this.discardedCollection = discardedCollection;
39
		this.validBulk = collection.initializeUnorderedBulkOperation();
40
		this.validOpCounter = 0;
41
		this.discardedBulk = discardedCollection.initializeUnorderedBulkOperation();
42
		this.discardedOpCounter = 0;
43
		this.bulkSize = bulkSize;
44
		this.recordParser = parser;
45
		this.discardRecords = discardRecords;
46
	}
47

    
48
	public void insert(final String record) {
49
		try {
50
			final Map<String, String> recordProperties = recordParser.parseRecord(record);
51
			log.debug("found props: " + recordProperties);
52
			if (recordProperties.containsKey("id")) {
53
				final DBObject obj = new BasicDBObject();
54
				final String id = recordProperties.get("id");
55
				final String originalId = recordProperties.get("originalId");
56
				obj.put("id", id);
57
				obj.put("originalId", originalId);
58
				obj.put("body", record);
59
				obj.put("timestamp", System.currentTimeMillis());
60

    
61
				final BulkWriteRequestBuilder bulkWriteRequestBuilder = validBulk.find(new BasicDBObject("id", obj.get("id")));
62
				final BulkUpdateRequestBuilder updateReq = bulkWriteRequestBuilder.upsert();
63
				updateReq.replaceOne(obj);
64
				validOpCounter++;
65
				if (validOpCounter % bulkSize == 0 && validOpCounter != 0) {
66
					validBulk.execute(WriteConcern.ACKNOWLEDGED);
67
					validBulk = validCollection.initializeUnorderedBulkOperation();
68
				}
69
			} else {
70
				if (discardRecords) {
71
					log.debug("parsed record seems invalid");
72
					discardedBulk.insert(new BasicDBObject("body", record));
73
					discardedOpCounter++;
74
					if (discardedOpCounter % bulkSize == 0 && discardedOpCounter != 0) {
75
						discardedBulk.execute(WriteConcern.ACKNOWLEDGED);
76
						discardedBulk = discardedCollection.initializeUnorderedBulkOperation();
77
					}
78
				}
79
			}
80
		} catch (Throwable e) {
81
			if (discardRecords) {
82
				log.debug("unhandled exception: " + e.getMessage());
83
				discardedBulk.insert(new BasicDBObject("body", record));
84
				discardedOpCounter++;
85
				if (discardedOpCounter % bulkSize == 0 && discardedOpCounter != 0) {
86
					discardedBulk.execute(WriteConcern.ACKNOWLEDGED);
87
					discardedBulk = discardedCollection.initializeUnorderedBulkOperation();
88
				}
89
			}
90
		}
91
	}
92

    
93
	public void flushBulks() {
94
		if (validOpCounter != 0) {
95
			validBulk.execute(WriteConcern.ACKNOWLEDGED);
96
			validCollection.getDB().command("{fsync:1}");
97
		}
98

    
99
		if (discardedOpCounter != 0) {
100
			discardedBulk.execute(WriteConcern.ACKNOWLEDGED);
101
			discardedCollection.getDB().command("{fsync:1}");
102
		}
103
	}
104

    
105
}
(3-3/5)