Revision 54390
Added by Sandro La Bruzzo over 5 years ago
ResolverMDStorePlugin.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.pid.resolver.mdstore.plugin; |
2 | 2 |
|
3 |
import com.google.common.collect.Lists; |
|
3 | 4 |
import com.mongodb.BasicDBObject; |
4 | 5 |
import com.mongodb.BasicDBObjectBuilder; |
5 | 6 |
import com.mongodb.DBObject; |
6 | 7 |
import com.mongodb.client.FindIterable; |
7 | 8 |
import com.mongodb.client.MongoCollection; |
8 | 9 |
import com.mongodb.client.MongoDatabase; |
9 |
import com.mongodb.client.model.Filters; |
|
10 |
import com.mongodb.client.model.UpdateOptions; |
|
10 |
import com.mongodb.client.model.*; |
|
11 | 11 |
import eu.dnetlib.data.mdstore.modular.action.DoneCallback; |
12 | 12 |
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin; |
13 | 13 |
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao; |
... | ... | |
176 | 176 |
final Bson queryByTs = Filters.gte("resolved_ts", Long.valueOf(timestamp)); |
177 | 177 |
int i = 0; |
178 | 178 |
final FindIterable<DBObject> dbObjects = timestamp == 0 ? resolvedRecord.find() : resolvedRecord.find(queryByTs); |
179 |
for (DBObject object : dbObjects) { |
|
180 |
Bson query = Filters.eq("id", object.get("id").toString()); |
|
181 |
final DBObject replacedObj = BasicDBObjectBuilder.start() |
|
182 |
.add("body", object.get("body").toString()) |
|
183 |
.add("resolved_ts", object.get("resolved_ts")) |
|
184 |
.get(); |
|
185 |
Bson newDocument = new Document("$set", replacedObj); |
|
186 |
currentMdStoreCollection.findOneAndUpdate(query, newDocument); |
|
187 |
i++; |
|
179 |
final UpdateOptions f = new UpdateOptions().upsert(true); |
|
180 |
final List<WriteModel<DBObject>> upsertList = new ArrayList<>(); |
|
181 |
final BulkWriteOptions writeOptions = new BulkWriteOptions().ordered(false); |
|
182 |
final int bulkSize = 1000; |
|
183 |
long validOpCounter =0; |
|
184 |
for (DBObject object : dbObjects) { |
|
185 |
if (StringUtils.isNotBlank(object.get("id").toString())) { |
|
186 |
final DBObject replacedObj = BasicDBObjectBuilder.start() |
|
187 |
.add("body", object.get("body").toString()) |
|
188 |
.add("resolved_ts", object.get("resolved_ts")) |
|
189 |
.get(); |
|
188 | 190 |
|
191 |
upsertList.add(new ReplaceOneModel(new BasicDBObject("id", object.get("id").toString()), replacedObj, f)); |
|
192 |
validOpCounter++; |
|
193 |
if (((validOpCounter % bulkSize) == 0) && (validOpCounter != 0)) { |
|
194 |
currentMdStoreCollection.bulkWrite(upsertList, writeOptions); |
|
195 |
upsertList.clear(); |
|
196 |
log.info("Transaction commit: Upserting: "+validOpCounter); |
|
197 |
} |
|
198 |
} |
|
189 | 199 |
} |
200 |
// if (upsertList.size()>0){ |
|
201 |
// mdstore.bulkWrite(upsertList, writeOptions); |
|
202 |
// } |
|
203 |
// |
|
204 |
// |
|
205 |
// |
|
206 |
// for (DBObject object : dbObjects) { |
|
207 |
// Bson query = Filters.eq("id", object.get("id").toString()); |
|
208 |
// final DBObject replacedObj = BasicDBObjectBuilder.start() |
|
209 |
// .add("body", object.get("body").toString()) |
|
210 |
// .add("resolved_ts", object.get("resolved_ts")) |
|
211 |
// .get(); |
|
212 |
// Bson newDocument = new Document("$set", replacedObj); |
|
213 |
// currentMdStoreCollection.findOneAndUpdate(query, newDocument); |
|
214 |
// i++; |
|
215 |
// |
|
216 |
// } |
|
190 | 217 |
|
191 | 218 |
log.info("Updated " + i); |
192 | 219 |
} |
220 |
|
|
221 |
|
|
193 | 222 |
} |
Also available in: Unified diff
updated resolver and Trying to allow mdstore plugin to print progress information on UI