1
|
package eu.dnetlib.resolver.mdstore.plugin;
|
2
|
|
3
|
import com.mongodb.BasicDBObject;
|
4
|
import com.mongodb.BasicDBObjectBuilder;
|
5
|
import com.mongodb.DBObject;
|
6
|
import com.mongodb.QueryBuilder;
|
7
|
import com.mongodb.client.FindIterable;
|
8
|
import com.mongodb.client.MongoCollection;
|
9
|
import com.mongodb.client.MongoDatabase;
|
10
|
import com.mongodb.client.model.Filters;
|
11
|
import com.mongodb.client.model.UpdateOptions;
|
12
|
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
|
13
|
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
|
14
|
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
|
15
|
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
|
16
|
import eu.dnetlib.dli.resolver.PIDResolver;
|
17
|
import eu.dnetlib.dli.resolver.model.serializer.ResolverSerializer;
|
18
|
import eu.dnetlib.rmi.data.MDStoreServiceException;
|
19
|
import org.apache.commons.lang3.StringUtils;
|
20
|
import org.apache.commons.logging.Log;
|
21
|
import org.apache.commons.logging.LogFactory;
|
22
|
import org.bson.Document;
|
23
|
import org.bson.conversions.Bson;
|
24
|
import org.springframework.beans.factory.annotation.Autowired;
|
25
|
|
26
|
import java.util.ArrayList;
|
27
|
import java.util.List;
|
28
|
import java.util.Map;
|
29
|
import java.util.concurrent.*;
|
30
|
|
31
|
import static com.mongodb.client.model.Filters.eq;
|
32
|
|
33
|
/**
|
34
|
* Created by sandro on 9/9/16.
|
35
|
*/
|
36
|
public class ResolverMDStorePlugin implements MDStorePlugin {
|
37
|
|
38
|
private static final Log log = LogFactory.getLog(ResolverMDStorePlugin.class);
|
39
|
public static DBObject DONE = new BasicDBObject();
|
40
|
|
41
|
@Autowired
|
42
|
private List<PIDResolver> pluginResolver;
|
43
|
|
44
|
|
45
|
@Autowired
|
46
|
private ResolverSerializer resolverSerializer;
|
47
|
|
48
|
@Autowired
|
49
|
private MDStoreTransactionManagerImpl transactionManager;
|
50
|
|
51
|
public static void save(MongoCollection<DBObject> collection, DBObject document) {
|
52
|
Object id = document.get("_id");
|
53
|
if (id == null) {
|
54
|
collection.insertOne(document);
|
55
|
} else {
|
56
|
collection.replaceOne(eq("_id", id), document, new UpdateOptions().upsert(true));
|
57
|
}
|
58
|
}
|
59
|
|
60
|
@Override
|
61
|
public void run(final MDStoreDao dao, final Map<String, String> params, final DoneCallback doneCallback) throws MDStoreServiceException {
|
62
|
|
63
|
try {
|
64
|
|
65
|
final String id = params.get("mdStoreId");
|
66
|
|
67
|
int numberOfThreads = 4;
|
68
|
|
69
|
final String numOfThreadsString = params.get("numberOfThreads");
|
70
|
|
71
|
try {
|
72
|
if (!StringUtils.isEmpty(numOfThreadsString)) {
|
73
|
numberOfThreads = Integer.parseInt(numOfThreadsString);
|
74
|
}
|
75
|
|
76
|
} catch (Throwable e) {
|
77
|
log.error("Number of threads Param is not an int value it will apply by default 4");
|
78
|
}
|
79
|
|
80
|
final boolean refresh = params.get("refresh") != null && Boolean.parseBoolean(params.get("refresh"));
|
81
|
|
82
|
final String internalId = transactionManager.readMdStore(id);
|
83
|
|
84
|
final MongoDatabase db = transactionManager.getDb();
|
85
|
|
86
|
final MongoCollection<DBObject> currentMdStoreCollection = db.getCollection(internalId, DBObject.class);
|
87
|
|
88
|
final MongoCollection<DBObject> resolvedRecord = db.getCollection("resolved_" + StringUtils.substringBefore(id, "_"), DBObject.class);
|
89
|
|
90
|
final BasicDBObject idx = new BasicDBObject();
|
91
|
idx.put("resolved_ts", 1);
|
92
|
resolvedRecord.createIndex(idx);
|
93
|
|
94
|
upsertResolved(currentMdStoreCollection, resolvedRecord, 0);
|
95
|
|
96
|
if (refresh) {
|
97
|
resolvedRecord.drop();
|
98
|
}
|
99
|
|
100
|
final FindIterable<DBObject> mdstoreRecords = currentMdStoreCollection.find();
|
101
|
|
102
|
mdstoreRecords.noCursorTimeout(true);
|
103
|
|
104
|
final BlockingQueue<DBObject> queue = new ArrayBlockingQueue<>(100);
|
105
|
|
106
|
final List<Future<Boolean>> responses = new ArrayList<>();
|
107
|
|
108
|
final ExecutorService executor = Executors.newFixedThreadPool(100);
|
109
|
|
110
|
final long total = currentMdStoreCollection.count();
|
111
|
|
112
|
int previousPrintValue = -1;
|
113
|
int currentPerc = 0;
|
114
|
|
115
|
final long ts = System.currentTimeMillis();
|
116
|
|
117
|
for (int i = 0; i < numberOfThreads; i++) {
|
118
|
final RecordResolver resolver = new RecordResolver(ts);
|
119
|
resolver.setInputQueue(queue);
|
120
|
resolver.setOutputCollection(resolvedRecord);
|
121
|
resolver.setSerializer(resolverSerializer);
|
122
|
resolver.setPluginResolver(pluginResolver);
|
123
|
responses.add(executor.submit(resolver));
|
124
|
}
|
125
|
|
126
|
int parsed = 0;
|
127
|
|
128
|
for (DBObject currentMdStoreRecord : mdstoreRecords) {
|
129
|
queue.put(currentMdStoreRecord);
|
130
|
|
131
|
currentPerc = Math.round(((float) ++parsed / (float) total) * 100.0F);
|
132
|
|
133
|
if (currentPerc != previousPrintValue) {
|
134
|
log.info("Resolving process " + currentPerc + " %");
|
135
|
previousPrintValue = currentPerc;
|
136
|
}
|
137
|
}
|
138
|
queue.put(DONE);
|
139
|
|
140
|
for (Future<Boolean> response : responses) {
|
141
|
response.get();
|
142
|
}
|
143
|
upsertResolved(currentMdStoreCollection, resolvedRecord, ts - 1);
|
144
|
doneCallback.call(params);
|
145
|
} catch (Throwable e) {
|
146
|
log.error(e);
|
147
|
throw new MDStoreServiceException("Error on resolving records ", e);
|
148
|
}
|
149
|
}
|
150
|
|
151
|
private void upsertResolved(MongoCollection<DBObject> currentMdStoreCollection, MongoCollection<DBObject> resolvedRecord, final long timestamp) {
|
152
|
log.info("Updating resolved objects");
|
153
|
|
154
|
final Bson queryByTs = Filters.gte("resolved_ts", Long.valueOf(timestamp));
|
155
|
int i = 0;
|
156
|
final FindIterable<DBObject> dbObjects = timestamp == 0 ? resolvedRecord.find() : resolvedRecord.find(queryByTs);
|
157
|
for (DBObject object : dbObjects) {
|
158
|
Bson query = Filters.eq("id", object.get("id").toString());
|
159
|
final DBObject replacedObj = BasicDBObjectBuilder.start()
|
160
|
.add("body", object.get("body").toString())
|
161
|
.add("resolved_ts", object.get("resolved_ts"))
|
162
|
.get();
|
163
|
Bson newDocument = new Document("$set", replacedObj);
|
164
|
currentMdStoreCollection.findOneAndUpdate(query, newDocument);
|
165
|
i++;
|
166
|
|
167
|
}
|
168
|
|
169
|
log.info("Updated " + i);
|
170
|
}
|
171
|
}
|