1
|
package eu.dnetlib.pid.resolver.mdstore.plugin;
|
2
|
|
3
|
import com.google.common.collect.Lists;
|
4
|
import com.mongodb.BasicDBObject;
|
5
|
import com.mongodb.BasicDBObjectBuilder;
|
6
|
import com.mongodb.DBObject;
|
7
|
import com.mongodb.client.FindIterable;
|
8
|
import com.mongodb.client.MongoCollection;
|
9
|
import com.mongodb.client.MongoDatabase;
|
10
|
import com.mongodb.client.model.*;
|
11
|
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
|
12
|
import eu.dnetlib.data.mdstore.modular.action.FailedCallback;
|
13
|
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
|
14
|
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
|
15
|
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
|
16
|
import eu.dnetlib.pid.resolver.PIDResolver;
|
17
|
import eu.dnetlib.rmi.data.MDStoreServiceException;
|
18
|
import org.apache.commons.lang3.StringUtils;
|
19
|
import org.apache.commons.logging.Log;
|
20
|
import org.apache.commons.logging.LogFactory;
|
21
|
import org.bson.Document;
|
22
|
import org.bson.conversions.Bson;
|
23
|
import org.springframework.beans.factory.annotation.Autowired;
|
24
|
|
25
|
import java.util.*;
|
26
|
import java.util.concurrent.*;
|
27
|
|
28
|
import static com.mongodb.client.model.Filters.*;
|
29
|
import static com.mongodb.client.model.Filters.gt;
|
30
|
import static com.mongodb.client.model.Filters.lt;
|
31
|
|
32
|
/**
|
33
|
* Created by sandro on 9/9/16.
|
34
|
*/
|
35
|
public class ResolverMDStorePlugin implements MDStorePlugin {
|
36
|
|
37
|
private static final Log log = LogFactory.getLog(ResolverMDStorePlugin.class);
|
38
|
public static DBObject DONE = new BasicDBObject();
|
39
|
|
40
|
|
41
|
|
42
|
@Autowired
|
43
|
private List<PIDResolver> pluginResolver;
|
44
|
|
45
|
|
46
|
@Autowired
|
47
|
private ResolverSerializer resolverSerializer;
|
48
|
|
49
|
@Autowired
|
50
|
private RecordResolverFactory recordResolverFactory;
|
51
|
|
52
|
|
53
|
@Autowired
|
54
|
private MDStoreTransactionManagerImpl transactionManager;
|
55
|
|
56
|
public static void save(MongoCollection<DBObject> collection, DBObject document) {
|
57
|
Object id = document.get("_id");
|
58
|
if (id == null) {
|
59
|
collection.insertOne(document);
|
60
|
} else {
|
61
|
collection.replaceOne(eq("_id", id), document, new UpdateOptions().upsert(true));
|
62
|
}
|
63
|
}
|
64
|
|
65
|
@Override
|
66
|
public void run(final MDStoreDao dao, final Map<String, String> params, final DoneCallback doneCallback, final FailedCallback failedCallback) {
|
67
|
try {
|
68
|
final String id = params.get("mdStoreId");
|
69
|
final boolean offline = "true".equals(params.get("offline"));
|
70
|
|
71
|
int numberOfThreads = 4;
|
72
|
|
73
|
final String numOfThreadsString = params.get("numberOfThreads");
|
74
|
|
75
|
final String resolvingMode = params.get("collectionMode");
|
76
|
|
77
|
final String lastResolveDate = params.get("lastResolveDate");
|
78
|
|
79
|
try {
|
80
|
if (!StringUtils.isEmpty(numOfThreadsString)) {
|
81
|
numberOfThreads = Integer.parseInt(numOfThreadsString);
|
82
|
}
|
83
|
|
84
|
} catch (Throwable e) {
|
85
|
log.error("Number of threads Param is not an int value it will apply by default 4");
|
86
|
}
|
87
|
|
88
|
final boolean refresh = "refresh".equalsIgnoreCase(resolvingMode);
|
89
|
|
90
|
final String internalId = transactionManager.readMdStore(id);
|
91
|
|
92
|
final MongoDatabase db = transactionManager.getDb();
|
93
|
|
94
|
final MongoCollection<DBObject> currentMdStoreCollection = db.getCollection(internalId, DBObject.class);
|
95
|
|
96
|
final MongoCollection<DBObject> resolvedRecord = db.getCollection("resolved_" + StringUtils.substringBefore(id, "_"), DBObject.class);
|
97
|
|
98
|
final BasicDBObject idx = new BasicDBObject();
|
99
|
idx.put("resolved_ts", 1);
|
100
|
resolvedRecord.createIndex(idx);
|
101
|
|
102
|
// if (refresh) {
|
103
|
// resolvedRecord.drop();
|
104
|
// }
|
105
|
|
106
|
if (!"INCREMENTAL".equalsIgnoreCase(resolvingMode))
|
107
|
upsertResolved(currentMdStoreCollection, resolvedRecord, 0);
|
108
|
|
109
|
|
110
|
final FindIterable<DBObject> mdstoreRecords = "refresh".equalsIgnoreCase(resolvingMode)?currentMdStoreCollection.find(): currentMdStoreCollection.find(dateQuery(lastResolveDate==null?0:Long.parseLong(lastResolveDate),null));
|
111
|
|
112
|
mdstoreRecords.noCursorTimeout(true);
|
113
|
|
114
|
final BlockingQueue<DBObject> queue = new ArrayBlockingQueue<>(100);
|
115
|
|
116
|
final List<Future<Boolean>> responses = new ArrayList<>();
|
117
|
|
118
|
final ExecutorService executor = Executors.newFixedThreadPool(100);
|
119
|
|
120
|
final long total = "refresh".equalsIgnoreCase(resolvingMode)?currentMdStoreCollection.count():currentMdStoreCollection.count(dateQuery(lastResolveDate==null?0:Long.parseLong(lastResolveDate),null));
|
121
|
|
122
|
int previousPrintValue = -1;
|
123
|
int currentPerc;
|
124
|
|
125
|
final long ts = System.currentTimeMillis();
|
126
|
|
127
|
|
128
|
Collections.sort(pluginResolver);
|
129
|
|
130
|
for (int i = 0; i < numberOfThreads; i++) {
|
131
|
final RecordResolver resolver = recordResolverFactory.createResolver(ts, queue, resolvedRecord, resolverSerializer, pluginResolver, offline, false);
|
132
|
responses.add(executor.submit(resolver));
|
133
|
}
|
134
|
|
135
|
int parsed = 0;
|
136
|
|
137
|
for (DBObject currentMdStoreRecord : mdstoreRecords) {
|
138
|
queue.put(currentMdStoreRecord);
|
139
|
|
140
|
currentPerc = Math.round(((float) ++parsed / (float) total) * 100.0F);
|
141
|
|
142
|
if (currentPerc != previousPrintValue) {
|
143
|
log.info("Resolving process " + currentPerc + " %");
|
144
|
previousPrintValue = currentPerc;
|
145
|
}
|
146
|
}
|
147
|
queue.put(DONE);
|
148
|
|
149
|
for (Future<Boolean> response : responses) {
|
150
|
response.get();
|
151
|
}
|
152
|
upsertResolved(currentMdStoreCollection, resolvedRecord, ts - 1);
|
153
|
doneCallback.call(params);
|
154
|
} catch (Throwable e) {
|
155
|
log.error(e);
|
156
|
throw new RuntimeException("Error on resolving records ", e);
|
157
|
}
|
158
|
}
|
159
|
|
160
|
|
161
|
private Bson dateQuery(final Long from, final Long until) {
|
162
|
if (from != null & until != null) {
|
163
|
return and(gt("timestamp", from), lt("timestamp", until));
|
164
|
}
|
165
|
if (from != null) {
|
166
|
return gt("timestamp", from);
|
167
|
}
|
168
|
if (until != null) {
|
169
|
return lt("timestamp", until);
|
170
|
}
|
171
|
return null;
|
172
|
}
|
173
|
|
174
|
|
175
|
private void upsertResolved(MongoCollection<DBObject> currentMdStoreCollection, MongoCollection<DBObject> resolvedRecord, final long timestamp) {
|
176
|
log.info("Updating resolved objects");
|
177
|
|
178
|
final Bson queryByTs = Filters.gte("resolved_ts", Long.valueOf(timestamp));
|
179
|
int i = 0;
|
180
|
final FindIterable<DBObject> dbObjects = timestamp == 0 ? resolvedRecord.find() : resolvedRecord.find(queryByTs);
|
181
|
final UpdateOptions f = new UpdateOptions().upsert(true);
|
182
|
final List<WriteModel<DBObject>> upsertList = new ArrayList<>();
|
183
|
final BulkWriteOptions writeOptions = new BulkWriteOptions().ordered(false);
|
184
|
final int bulkSize = 1000;
|
185
|
long validOpCounter =0;
|
186
|
for (DBObject object : dbObjects) {
|
187
|
if (StringUtils.isNotBlank(object.get("id").toString())) {
|
188
|
final DBObject replacedObj = BasicDBObjectBuilder.start()
|
189
|
.add("body", object.get("body").toString())
|
190
|
.add("id", object.get("id").toString())
|
191
|
.add("resolved_ts", object.get("resolved_ts"))
|
192
|
.get();
|
193
|
|
194
|
upsertList.add(new ReplaceOneModel(new BasicDBObject("id", object.get("id").toString()), replacedObj, f));
|
195
|
validOpCounter++;
|
196
|
if (((validOpCounter % bulkSize) == 0) && (validOpCounter != 0)) {
|
197
|
currentMdStoreCollection.bulkWrite(upsertList, writeOptions);
|
198
|
upsertList.clear();
|
199
|
log.info("Transaction commit: Upserting: "+validOpCounter);
|
200
|
}
|
201
|
}
|
202
|
}
|
203
|
if (upsertList.size()>0){
|
204
|
currentMdStoreCollection.bulkWrite(upsertList, writeOptions);
|
205
|
}
|
206
|
//
|
207
|
//
|
208
|
//
|
209
|
// for (DBObject object : dbObjects) {
|
210
|
// Bson query = Filters.eq("id", object.get("id").toString());
|
211
|
// final DBObject replacedObj = BasicDBObjectBuilder.start()
|
212
|
// .add("body", object.get("body").toString())
|
213
|
// .add("resolved_ts", object.get("resolved_ts"))
|
214
|
// .get();
|
215
|
// Bson newDocument = new Document("$set", replacedObj);
|
216
|
// currentMdStoreCollection.findOneAndUpdate(query, newDocument);
|
217
|
// i++;
|
218
|
//
|
219
|
// }
|
220
|
|
221
|
log.info("Updated " + i);
|
222
|
}
|
223
|
|
224
|
|
225
|
}
|