Project

General

Profile

1
package eu.dnetlib.resolver.mdstore.plugin;
2

    
3
import com.mongodb.BasicDBObject;
4
import com.mongodb.BasicDBObjectBuilder;
5
import com.mongodb.DBObject;
6
import com.mongodb.QueryBuilder;
7
import com.mongodb.client.FindIterable;
8
import com.mongodb.client.MongoCollection;
9
import com.mongodb.client.MongoDatabase;
10
import com.mongodb.client.model.Filters;
11
import com.mongodb.client.model.UpdateOptions;
12
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
13
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
14
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
15
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
16
import eu.dnetlib.dli.resolver.PIDResolver;
17
import eu.dnetlib.dli.resolver.model.serializer.ResolverSerializer;
18
import eu.dnetlib.rmi.data.MDStoreServiceException;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.bson.Document;
23
import org.bson.conversions.Bson;
24
import org.springframework.beans.factory.annotation.Autowired;
25

    
26
import java.util.ArrayList;
27
import java.util.List;
28
import java.util.Map;
29
import java.util.concurrent.*;
30

    
31
import static com.mongodb.client.model.Filters.eq;
32

    
33
/**
34
 * Created by sandro on 9/9/16.
35
 */
36
public class ResolverMDStorePlugin implements MDStorePlugin {
37

    
38
	private static final Log log = LogFactory.getLog(ResolverMDStorePlugin.class);
39
	public static DBObject DONE = new BasicDBObject();
40

    
41
	@Autowired
42
	private List<PIDResolver> pluginResolver;
43

    
44

    
45
	@Autowired
46
	private ResolverSerializer resolverSerializer;
47

    
48
	@Autowired
49
	private MDStoreTransactionManagerImpl transactionManager;
50

    
51
	public static void save(MongoCollection<DBObject> collection, DBObject document) {
52
		Object id = document.get("_id");
53
		if (id == null) {
54
			collection.insertOne(document);
55
		} else {
56
			collection.replaceOne(eq("_id", id), document, new UpdateOptions().upsert(true));
57
		}
58
	}
59

    
60
	@Override
61
	public void run(final MDStoreDao dao, final Map<String, String> params, final DoneCallback doneCallback) throws MDStoreServiceException {
62

    
63
		try {
64

    
65
			final String id = params.get("mdStoreId");
66

    
67
            int numberOfThreads = 4;
68

    
69
            final String numOfThreadsString = params.get("numberOfThreads");
70

    
71
            try {
72
                if (!StringUtils.isEmpty(numOfThreadsString)) {
73
                    numberOfThreads = Integer.parseInt(numOfThreadsString);
74
                }
75

    
76
            } catch (Throwable e) {
77
                log.error("Number of threads Param is not an int value it will apply by default 4");
78
            }
79

    
80
			final boolean refresh = params.get("refresh") != null && Boolean.parseBoolean(params.get("refresh"));
81

    
82
			final String internalId = transactionManager.readMdStore(id);
83

    
84
			final MongoDatabase db = transactionManager.getDb();
85

    
86
			final MongoCollection<DBObject> currentMdStoreCollection = db.getCollection(internalId, DBObject.class);
87

    
88
			final MongoCollection<DBObject> resolvedRecord = db.getCollection("resolved_" + StringUtils.substringBefore(id, "_"), DBObject.class);
89

    
90
            final BasicDBObject idx = new BasicDBObject();
91
            idx.put("resolved_ts", 1);
92
            resolvedRecord.createIndex(idx);
93

    
94
            upsertResolved(currentMdStoreCollection, resolvedRecord, 0);
95

    
96
			if (refresh) {
97
				resolvedRecord.drop();
98
			}
99

    
100
			final FindIterable<DBObject> mdstoreRecords = currentMdStoreCollection.find();
101

    
102
			final BlockingQueue<DBObject> queue = new ArrayBlockingQueue<>(100);
103

    
104
			final List<Future<Boolean>> responses = new ArrayList<>();
105

    
106
			final ExecutorService executor = Executors.newFixedThreadPool(100);
107

    
108
			final long total = currentMdStoreCollection.count();
109

    
110
			int previousPrintValue = -1;
111
			int currentPerc = 0;
112

    
113
            final long ts = System.currentTimeMillis();
114

    
115
            for (int i = 1; i < numberOfThreads; i++) {
116
                final RecordResolver resolver = new RecordResolver(ts);
117
                resolver.setInputQueue(queue);
118
				resolver.setOutputCollection(resolvedRecord);
119
				resolver.setSerializer(resolverSerializer);
120
				resolver.setPluginResolver(pluginResolver);
121
				responses.add(executor.submit(resolver));
122
			}
123

    
124
			int parsed = 0;
125

    
126

    
127
			for (DBObject currentMdStoreRecord : mdstoreRecords) {
128
				queue.put(currentMdStoreRecord);
129

    
130
				currentPerc = Math.round(((float) ++parsed / (float) total) * 100.0F);
131

    
132
				if (currentPerc != previousPrintValue) {
133
					log.info("Resolving process " + currentPerc + " %");
134
					previousPrintValue = currentPerc;
135
				}
136
			}
137
			queue.put(DONE);
138

    
139
			for (Future<Boolean> response : responses) {
140
				response.get();
141
			}
142
            upsertResolved(currentMdStoreCollection, resolvedRecord, ts - 1);
143
            doneCallback.call(params);
144
		} catch (Throwable e) {
145
            log.error(e);
146
            throw new MDStoreServiceException("Error on resolving records ", e);
147
		}
148
	}
149

    
150
    private void upsertResolved(MongoCollection<DBObject> currentMdStoreCollection, MongoCollection<DBObject> resolvedRecord, final long timestamp) {
151
        log.info("Updating resolved objects");
152

    
153
        final Bson queryByTs = Filters.gte("resolved_ts", Long.valueOf(timestamp));
154
        int i = 0;
155
        final FindIterable<DBObject> dbObjects = timestamp == 0 ? resolvedRecord.find() : resolvedRecord.find(queryByTs);
156
        for (DBObject object : dbObjects) {
157
			Bson query = Filters.eq("id", object.get("id").toString());
158
			final DBObject replacedObj = BasicDBObjectBuilder.start()
159
					.add("body", object.get("body").toString())
160
                    .add("resolved_ts", object.get("resolved_ts"))
161
                    .get();
162
			Bson newDocument = new Document("$set", replacedObj);
163
			currentMdStoreCollection.findOneAndUpdate(query, newDocument);
164
            i++;
165

    
166
		}
167

    
168
        log.info("Updated " + i);
169
    }
170
}
(2-2/2)