Project

General

Profile

« Previous | Next » 

Revision 46517

metadata and transactionInfo managed using spring data repository

View differences:

MongoBulkWritesManager.java
12 12
import org.apache.commons.logging.Log;
13 13
import org.apache.commons.logging.LogFactory;
14 14

  
15
import static eu.dnetlib.data.mdstore.MDStoreConstants.*;
15
import static eu.dnetlib.data.mdstore.utils.MDStoreConstants.*;
16 16

  
17 17
public class MongoBulkWritesManager {
18 18

  
19 19
	private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
20
	private final boolean discardRecords;
21
	private MongoCollection<MDStoreRecord> validCollection;
22
	private List<WriteModel<MDStoreRecord>> validBulkOperationList;
20
	private MongoCollection<MDStoreRecord> collection;
21
	private List<WriteModel<MDStoreRecord>> validBulkOpList;
23 22

  
24 23
	private BulkWriteOptions writeOptions;
25
	private MongoCollection<MDStoreRecord> discardedCollection;
26
	private List<WriteModel<MDStoreRecord>> discardedBulkOperationList;
24
	private MongoCollection<MDStoreRecord> discarded;
25
	private List<WriteModel<MDStoreRecord>> discardedBulkOpList;
27 26

  
28 27
	private int bulkSize;
29 28

  
30
	public MongoBulkWritesManager(final MongoCollection<MDStoreRecord> collection,
31
			final MongoCollection<MDStoreRecord> discardedCollection,
32
			final int bulkSize,
33
			final boolean discardRecords) {
34
		this.validCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
35
		this.validBulkOperationList = Lists.newArrayList();
29
	public MongoBulkWritesManager(
30
			final MongoCollection<MDStoreRecord> collection,
31
			final MongoCollection<MDStoreRecord> discarded,
32
			final int bulkSize) {
33
		this.collection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
34
		this.validBulkOpList = Lists.newArrayList();
36 35

  
37
		this.discardedCollection = discardedCollection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
38
		this.discardedBulkOperationList = Lists.newArrayList();
36
		this.discarded = discarded.withWriteConcern(WriteConcern.ACKNOWLEDGED);
37
		this.discardedBulkOpList = Lists.newArrayList();
39 38

  
40 39
		this.bulkSize = bulkSize;
41
		this.discardRecords = discardRecords;
42 40
		this.writeOptions = new BulkWriteOptions().ordered(false);
43 41
	}
44 42

  
......
49 47
				if (log.isDebugEnabled()) {
50 48
					log.debug("Saving object" + r.getId());
51 49
				}
52
				validBulkOperationList.add(new ReplaceOneModel(new BasicDBObject(ID, r.getId()), r, new UpdateOptions().upsert(true)));
53
				if (((validBulkOperationList.size() % bulkSize) == 0) && !validBulkOperationList.isEmpty()) {
54
					validCollection.bulkWrite(validBulkOperationList, writeOptions);
55
					validBulkOperationList.clear();
50
				validBulkOpList.add(new ReplaceOneModel(new BasicDBObject(ID, r.getId()), r, new UpdateOptions().upsert(true)));
51
				if (((validBulkOpList.size() % bulkSize) == 0) && !validBulkOpList.isEmpty()) {
52
					collection.bulkWrite(validBulkOpList, writeOptions);
53
					validBulkOpList.clear();
56 54
				}
57 55
			} else {
58
				if (discardRecords) {
59
					log.debug("parsed record seems invalid");
60
					discardRecord(r);
61
				}
56
				log.debug("parsed record seems invalid");
57
				discardRecord(r);
62 58
			}
63 59
		} catch (Throwable e) {
64
			if (discardRecords) {
65
				log.debug("unhandled exception: " + e.getMessage());
66
				discardRecord(r);
67
			}
60
			log.debug("unhandled exception: " + e.getMessage());
61
			discardRecord(r);
68 62
		}
69 63
	}
70 64

  
71 65
	private void discardRecord(final MDStoreRecord MDStoreRecord) {
72
		discardedBulkOperationList.add(new InsertOneModel(new BasicDBObject(BODY, MDStoreRecord)));
66
		discardedBulkOpList.add(new InsertOneModel(new BasicDBObject(BODY, MDStoreRecord)));
73 67

  
74
		if (((discardedBulkOperationList.size() % bulkSize) == 0) && !discardedBulkOperationList.isEmpty()) {
75
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
76
			discardedBulkOperationList.clear();
68
		if (((discardedBulkOpList.size() % bulkSize) == 0) && !discardedBulkOpList.isEmpty()) {
69
			discarded.bulkWrite(discardedBulkOpList, writeOptions);
70
			discardedBulkOpList.clear();
77 71
		}
78 72
	}
79 73

  
80 74
	public void flushBulks() {
81 75
		//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
82 76
		//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
83
		if (!validBulkOperationList.isEmpty()) {
84
			validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.JOURNALED);
85
			validCollection.bulkWrite(validBulkOperationList, writeOptions);
77
		if (!validBulkOpList.isEmpty()) {
78
			collection = getCollectionWithWriteConcern(collection, WriteConcern.JOURNALED);
79
			collection.bulkWrite(validBulkOpList, writeOptions);
86 80
		}
87
		if (!discardedBulkOperationList.isEmpty()) {
88
			discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
89
			discardedCollection.bulkWrite(discardedBulkOperationList, writeOptions);
81
		if (!discardedBulkOpList.isEmpty()) {
82
			discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
83
			discarded.bulkWrite(discardedBulkOpList, writeOptions);
90 84
		}
91 85
		//setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
92
		validCollection = getCollectionWithWriteConcern(validCollection, WriteConcern.ACKNOWLEDGED);
93
		discardedCollection = getCollectionWithWriteConcern(discardedCollection, WriteConcern.ACKNOWLEDGED);
86
		collection = getCollectionWithWriteConcern(collection, WriteConcern.ACKNOWLEDGED);
87
		discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
94 88
	}
95 89

  
96 90
	private MongoCollection<MDStoreRecord> getCollectionWithWriteConcern(MongoCollection<MDStoreRecord> collection, WriteConcern writeConcern) {

Also available in: Unified diff