Project

General

Profile

1
package eu.dnetlib.pid.resolver.mdstore.plugin;
2

    
3
import com.mongodb.BasicDBObject;
4
import com.mongodb.BasicDBObjectBuilder;
5
import com.mongodb.DBObject;
6
import com.mongodb.client.FindIterable;
7
import com.mongodb.client.MongoCollection;
8
import com.mongodb.client.MongoDatabase;
9
import com.mongodb.client.model.Filters;
10
import com.mongodb.client.model.UpdateOptions;
11
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
12
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
13
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
14
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
15
import eu.dnetlib.pid.resolver.PIDResolver;
16
import eu.dnetlib.rmi.data.MDStoreServiceException;
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.bson.Document;
21
import org.bson.conversions.Bson;
22
import org.springframework.beans.factory.annotation.Autowired;
23

    
24
import java.util.ArrayList;
25
import java.util.List;
26
import java.util.Map;
27
import java.util.concurrent.*;
28

    
29
import static com.mongodb.client.model.Filters.eq;
30

    
31
/**
32
 * Created by sandro on 9/9/16.
33
 */
34
public class ResolverMDStorePlugin implements MDStorePlugin {
35

    
36
	private static final Log log = LogFactory.getLog(ResolverMDStorePlugin.class);
37
	public static DBObject DONE = new BasicDBObject();
38

    
39
	@Autowired
40
	private List<PIDResolver> pluginResolver;
41

    
42

    
43
	@Autowired
44
	private ResolverSerializer resolverSerializer;
45

    
46
	@Autowired
47
	private RecordResolverFactory recordResolverFactory;
48

    
49

    
50
	@Autowired
51
	private MDStoreTransactionManagerImpl transactionManager;
52

    
53
	public static void save(MongoCollection<DBObject> collection, DBObject document) {
54
		Object id = document.get("_id");
55
		if (id == null) {
56
			collection.insertOne(document);
57
		} else {
58
			collection.replaceOne(eq("_id", id), document, new UpdateOptions().upsert(true));
59
		}
60
	}
61

    
62
	@Override
63
	public void run(final MDStoreDao dao, final Map<String, String> params, final DoneCallback doneCallback) throws MDStoreServiceException {
64

    
65
		try {
66

    
67
			final String id = params.get("mdStoreId");
68

    
69
            int numberOfThreads = 4;
70

    
71
            final String numOfThreadsString = params.get("numberOfThreads");
72

    
73
            try {
74
                if (!StringUtils.isEmpty(numOfThreadsString)) {
75
                    numberOfThreads = Integer.parseInt(numOfThreadsString);
76
                }
77

    
78
            } catch (Throwable e) {
79
                log.error("Number of threads Param is not an int value it will apply by default 4");
80
            }
81

    
82
			final boolean refresh = params.get("refresh") != null && Boolean.parseBoolean(params.get("refresh"));
83

    
84
			final String internalId = transactionManager.readMdStore(id);
85

    
86
			final MongoDatabase db = transactionManager.getDb();
87

    
88
			final MongoCollection<DBObject> currentMdStoreCollection = db.getCollection(internalId, DBObject.class);
89

    
90
			final MongoCollection<DBObject> resolvedRecord = db.getCollection("resolved_" + StringUtils.substringBefore(id, "_"), DBObject.class);
91

    
92
            final BasicDBObject idx = new BasicDBObject();
93
            idx.put("resolved_ts", 1);
94
            resolvedRecord.createIndex(idx);
95

    
96
            upsertResolved(currentMdStoreCollection, resolvedRecord, 0);
97

    
98
			if (refresh) {
99
				resolvedRecord.drop();
100
			}
101

    
102
			final FindIterable<DBObject> mdstoreRecords = currentMdStoreCollection.find();
103

    
104
            mdstoreRecords.noCursorTimeout(true);
105

    
106
			final BlockingQueue<DBObject> queue = new ArrayBlockingQueue<>(100);
107

    
108
			final List<Future<Boolean>> responses = new ArrayList<>();
109

    
110
			final ExecutorService executor = Executors.newFixedThreadPool(100);
111

    
112
			final long total = currentMdStoreCollection.count();
113

    
114
			int previousPrintValue = -1;
115
            int currentPerc;
116

    
117
            final long ts = System.currentTimeMillis();
118

    
119
            for (int i = 0; i < numberOfThreads; i++) {
120
				final RecordResolver resolver = recordResolverFactory.createResolver(ts, queue, resolvedRecord, resolverSerializer, pluginResolver);
121
				responses.add(executor.submit(resolver));
122
			}
123

    
124
			int parsed = 0;
125

    
126
			for (DBObject currentMdStoreRecord : mdstoreRecords) {
127
				queue.put(currentMdStoreRecord);
128

    
129
				currentPerc = Math.round(((float) ++parsed / (float) total) * 100.0F);
130

    
131
				if (currentPerc != previousPrintValue) {
132
					log.info("Resolving process " + currentPerc + " %");
133
					previousPrintValue = currentPerc;
134
				}
135
			}
136
			queue.put(DONE);
137

    
138
			for (Future<Boolean> response : responses) {
139
				response.get();
140
			}
141
            upsertResolved(currentMdStoreCollection, resolvedRecord, ts - 1);
142
            doneCallback.call(params);
143
		} catch (Throwable e) {
144
            log.error(e);
145
            throw new MDStoreServiceException("Error on resolving records ", e);
146
		}
147
	}
148

    
149
    private void upsertResolved(MongoCollection<DBObject> currentMdStoreCollection, MongoCollection<DBObject> resolvedRecord, final long timestamp) {
150
        log.info("Updating resolved objects");
151

    
152
        final Bson queryByTs = Filters.gte("resolved_ts", Long.valueOf(timestamp));
153
        int i = 0;
154
        final FindIterable<DBObject> dbObjects = timestamp == 0 ? resolvedRecord.find() : resolvedRecord.find(queryByTs);
155
        for (DBObject object : dbObjects) {
156
			Bson query = Filters.eq("id", object.get("id").toString());
157
			final DBObject replacedObj = BasicDBObjectBuilder.start()
158
					.add("body", object.get("body").toString())
159
                    .add("resolved_ts", object.get("resolved_ts"))
160
                    .get();
161
			Bson newDocument = new Document("$set", replacedObj);
162
			currentMdStoreCollection.findOneAndUpdate(query, newDocument);
163
            i++;
164

    
165
		}
166

    
167
        log.info("Updated " + i);
168
    }
169
}
(4-4/5)