Project

General

Profile

1
package eu.dnetlib.data.mdstore.modular.newmongodb;
2

    
3

    
4
import com.google.common.collect.Lists;
5
import com.mongodb.BasicDBObject;
6
import com.mongodb.WriteConcern;
7
import com.mongodb.client.MongoCollection;
8
import com.mongodb.client.model.*;
9
import eu.dnetlib.data.mdstore.modular.newmongodb.model.MDStoreRecord;
10
import org.apache.commons.lang3.StringUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.bson.Document;
14

    
15
import java.util.List;
16
import static eu.dnetlib.data.mdstore.modular.newmongodb.model.MDStoreEncoder.*;
17
import static eu.dnetlib.data.mdstore.modular.mongodb.utils.MDStoreConstants.*;
18
/**
19
 * Created by sandro on 5/23/17.
20
 */
21
public class MongoBulkWritesManager {
22

    
23
    private static final Log log = LogFactory.getLog(MongoBulkWritesManager.class);
24
    private MongoCollection<Document> collection;
25
    private List<WriteModel<Document>> validBulkOpList;
26

    
27
    private BulkWriteOptions writeOptions;
28
    private MongoCollection<Document> discarded;
29
    private List<WriteModel<Document>> discardedBulkOpList;
30

    
31
    private int bulkSize;
32

    
33
    public MongoBulkWritesManager(
34
            final MongoCollection<Document> collection,
35
            final MongoCollection<Document> discarded,
36
            final int bulkSize) {
37
        this.collection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED);
38
        this.validBulkOpList = Lists.newArrayList();
39

    
40
        this.discarded = discarded.withWriteConcern(WriteConcern.ACKNOWLEDGED);
41
        this.discardedBulkOpList = Lists.newArrayList();
42

    
43
        this.bulkSize = bulkSize;
44
        this.writeOptions = new BulkWriteOptions().ordered(false);
45
    }
46

    
47
    public void insert(final MDStoreRecord r) {
48
        try {
49
            if (StringUtils.isNotBlank(r.getRecordIdentifier())) {
50

    
51
                if (log.isDebugEnabled()) {
52
                    log.debug("Saving object" + r.getRecordIdentifier());
53
                }
54
                new InsertOneModel<>(encodeMDStoreRecord(r));
55
                validBulkOpList.add(new InsertOneModel<>(encodeMDStoreRecord(r)));
56
                if (((validBulkOpList.size() % bulkSize) == 0) && !validBulkOpList.isEmpty()) {
57
                    collection.bulkWrite(validBulkOpList, writeOptions);
58
                    validBulkOpList.clear();
59
                }
60
            } else {
61
                log.debug("parsed record seems invalid");
62
                discardRecord(r);
63
            }
64
        } catch (Throwable e) {
65
            log.debug("unhandled exception: " + e.getMessage());
66
            discardRecord(r);
67
        }
68
    }
69

    
70
    private void discardRecord(final MDStoreRecord record)  {
71
        try {
72
            final Document result = new Document();
73
            result.put(BODY, record.getBody());
74
            discardedBulkOpList.add(new InsertOneModel<>(result ));
75
            if (((discardedBulkOpList.size() % bulkSize) == 0) && !discardedBulkOpList.isEmpty()) {
76
                discarded.bulkWrite(discardedBulkOpList, writeOptions);
77
                discardedBulkOpList.clear();
78
            }
79
        } catch (Throwable e) {
80
            log.error("Error on discard Record", e);
81
        }
82
    }
83

    
84
    public void flushBulks() {
85
        //setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk)
86
        //the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.'
87
        if (!validBulkOpList.isEmpty()) {
88
            collection = getCollectionWithWriteConcern(collection, WriteConcern.JOURNALED);
89
            collection.bulkWrite(validBulkOpList, writeOptions);
90
        }
91
        if (!discardedBulkOpList.isEmpty()) {
92
            discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
93
            discarded.bulkWrite(discardedBulkOpList, writeOptions);
94
        }
95
        //setting write concern back to ACKNOWLEDGE to avoid the execution of future writes all in Journaled mode
96
        collection = getCollectionWithWriteConcern(collection, WriteConcern.ACKNOWLEDGED);
97
        discarded = getCollectionWithWriteConcern(discarded, WriteConcern.ACKNOWLEDGED);
98
    }
99

    
100
    private MongoCollection<Document> getCollectionWithWriteConcern(MongoCollection<Document> collection, WriteConcern writeConcern) {
101
        return collection.withWriteConcern(writeConcern);
102
    }
103
}
(2-2/5)