1 |
43783
|
sandro.lab
|
package eu.dnetlib.resolver.mdstore.plugin;
|
2 |
|
|
|
3 |
|
|
import com.mongodb.DBObject;
|
4 |
|
|
import com.mongodb.client.MongoCollection;
|
5 |
45816
|
sandro.lab
|
import eu.dnetlib.dli.DLIUtils;
|
6 |
49184
|
sandro.lab
|
import eu.dnetlib.dli.resolver.model.CompletionStatus;
|
7 |
|
|
import eu.dnetlib.dli.resolver.model.DLIObjectRelation;
|
8 |
|
|
import eu.dnetlib.dli.resolver.model.DLIResolvedObject;
|
9 |
|
|
import eu.dnetlib.dli.resolver.model.PID;
|
10 |
44356
|
claudio.at
|
import eu.dnetlib.dli.resolver.model.serializer.ResolverSerializer;
|
11 |
49184
|
sandro.lab
|
import eu.dnetlib.pid.resolver.PIDResolver;
|
12 |
|
|
import eu.dnetlib.pid.resolver.model.ObjectType;
|
13 |
44050
|
sandro.lab
|
import eu.dnetlib.resolver.parser.DLIParser;
|
14 |
43783
|
sandro.lab
|
import org.antlr.stringtemplate.StringTemplate;
|
15 |
|
|
import org.apache.commons.lang3.StringUtils;
|
16 |
|
|
import org.apache.commons.logging.Log;
|
17 |
|
|
import org.apache.commons.logging.LogFactory;
|
18 |
|
|
|
19 |
49184
|
sandro.lab
|
import java.util.HashMap;
|
20 |
|
|
import java.util.List;
|
21 |
|
|
import java.util.Map;
|
22 |
|
|
import java.util.concurrent.BlockingQueue;
|
23 |
|
|
import java.util.concurrent.Callable;
|
24 |
45816
|
sandro.lab
|
|
25 |
43783
|
sandro.lab
|
/**
|
26 |
|
|
* Created by sandro on 9/22/16.
|
27 |
|
|
*/
|
28 |
|
|
public class RecordResolver implements Callable<Boolean> {
|
29 |
|
|
|
30 |
|
|
private static final Log log = LogFactory.getLog(RecordResolver.class);
|
31 |
44050
|
sandro.lab
|
|
32 |
|
|
private final DLIParser parser = new DLIParser();
|
33 |
45917
|
sandro.lab
|
private final Long timestamp;
|
34 |
|
|
private List<PIDResolver> pluginResolver;
|
35 |
43783
|
sandro.lab
|
private BlockingQueue<DBObject> inputQueue;
|
36 |
|
|
private MongoCollection<DBObject> outputCollection;
|
37 |
44050
|
sandro.lab
|
private ResolverSerializer serializer;
|
38 |
|
|
|
39 |
45917
|
sandro.lab
|
public RecordResolver(final long ts) {
|
40 |
|
|
this.timestamp = Long.valueOf(ts);
|
41 |
|
|
}
|
42 |
|
|
|
43 |
46475
|
sandro.lab
|
private String resolve(final String inputRecord) {
|
44 |
44050
|
sandro.lab
|
|
45 |
49184
|
sandro.lab
|
final DLIResolvedObject inputObject = parser.parse(inputRecord);
|
46 |
|
|
if (inputObject != null && StringUtils.isNoneBlank(inputObject.getResolvedDate()))
|
47 |
45835
|
sandro.lab
|
return null;
|
48 |
45628
|
sandro.lab
|
|
49 |
|
|
if (inputObject != null && !StringUtils.isBlank(inputObject.getPid())) {
|
50 |
|
|
log.debug("trying to resolve " + inputObject.getPid());
|
51 |
|
|
}
|
52 |
|
|
|
53 |
43783
|
sandro.lab
|
boolean shouldUpdate = false;
|
54 |
|
|
if (inputObject.getCompletionStatus() == null || !inputObject.getCompletionStatus().equals(CompletionStatus.complete.toString())) {
|
55 |
|
|
shouldUpdate = shouldUpdate || tryToResolveRecord(inputObject);
|
56 |
|
|
}
|
57 |
|
|
if (inputObject.getRelations() != null) {
|
58 |
49184
|
sandro.lab
|
for (DLIObjectRelation rel : inputObject.getRelations()) {
|
59 |
45816
|
sandro.lab
|
final Map<String, ObjectType> resolvedRelation = tryToResolveRelation(rel.getTargetPID());
|
60 |
|
|
if (resolvedRelation != null && !resolvedRelation.isEmpty()) {
|
61 |
|
|
resolvedRelation.entrySet()
|
62 |
|
|
.forEach(e -> {
|
63 |
|
|
rel.setTargetPID(new PID(e.getKey(), "dnet"));
|
64 |
|
|
rel.setTargetType(e.getValue());
|
65 |
|
|
});
|
66 |
|
|
shouldUpdate = true;
|
67 |
43783
|
sandro.lab
|
}
|
68 |
|
|
}
|
69 |
|
|
if (shouldUpdate) {
|
70 |
|
|
final String newXML = serializer.serializeReplacingXML(inputRecord, inputObject);
|
71 |
|
|
return newXML;
|
72 |
|
|
}
|
73 |
|
|
}
|
74 |
|
|
return null;
|
75 |
|
|
}
|
76 |
|
|
|
77 |
|
|
public void setTemplate(final StringTemplate template) {
|
78 |
43939
|
sandro.lab
|
serializer.setDmfTemplate(template);
|
79 |
43783
|
sandro.lab
|
}
|
80 |
|
|
|
81 |
45816
|
sandro.lab
|
private Map<String, ObjectType> tryToResolveRelation(final PID currentPid) {
|
82 |
|
|
if (currentPid.getId() != null && currentPid.getId().contains("dli_resolver")) {
|
83 |
43783
|
sandro.lab
|
return null;
|
84 |
|
|
}
|
85 |
45628
|
sandro.lab
|
log.debug("tryToResolveRelation " + currentPid);
|
86 |
|
|
|
87 |
43783
|
sandro.lab
|
for (PIDResolver resolver : pluginResolver) {
|
88 |
49184
|
sandro.lab
|
final DLIResolvedObject currentIdentifier = (DLIResolvedObject) resolver.retrievePID(currentPid.getId(), currentPid.getType());
|
89 |
45816
|
sandro.lab
|
|
90 |
|
|
|
91 |
|
|
if (currentIdentifier != null && !StringUtils.isBlank(currentIdentifier.getPid()) && currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
|
92 |
|
|
final HashMap<String, ObjectType> result = new HashMap<>();
|
93 |
45835
|
sandro.lab
|
result.put("dli_resolver::" + DLIUtils.generateIdentifier(currentIdentifier.getPid(), currentIdentifier.getPidType()),
|
94 |
|
|
currentIdentifier.getType());
|
95 |
45816
|
sandro.lab
|
return result;
|
96 |
|
|
}
|
97 |
43783
|
sandro.lab
|
}
|
98 |
|
|
return null;
|
99 |
|
|
}
|
100 |
|
|
|
101 |
49184
|
sandro.lab
|
private boolean tryToResolveRecord(final DLIResolvedObject object) {
|
102 |
43783
|
sandro.lab
|
|
103 |
|
|
for (PIDResolver resolver : pluginResolver) {
|
104 |
49184
|
sandro.lab
|
final DLIResolvedObject resolvedObject = (DLIResolvedObject) resolver.retrievePID(object.getPid(), object.getPidType());
|
105 |
|
|
if (resolvedObject != null &&
|
106 |
43783
|
sandro.lab
|
resolvedObject.getCompletionStatus() != null &&
|
107 |
|
|
resolvedObject.getCompletionStatus().toString().equals(CompletionStatus.complete.toString())) {
|
108 |
|
|
{
|
109 |
|
|
object.setAuthors(resolvedObject.getAuthors());
|
110 |
|
|
object.setTitles(resolvedObject.getTitles());
|
111 |
|
|
object.setCompletionStatus(resolvedObject.getCompletionStatus());
|
112 |
|
|
object.setDate(resolvedObject.getDate());
|
113 |
|
|
object.getDatasourceProvenance().addAll(resolvedObject.getDatasourceProvenance());
|
114 |
44352
|
sandro.lab
|
object.setDescription(resolvedObject.getDescription());
|
115 |
|
|
object.setSubjects(resolvedObject.getSubjects());
|
116 |
43783
|
sandro.lab
|
object.setType(resolvedObject.getType());
|
117 |
45628
|
sandro.lab
|
log.debug("Record Resolved by " + resolver.getClass().getCanonicalName() + " PID: " + object.getPid());
|
118 |
43783
|
sandro.lab
|
return true;
|
119 |
|
|
}
|
120 |
|
|
}
|
121 |
|
|
}
|
122 |
45628
|
sandro.lab
|
log.debug("Record NOT Resolved PID: " + object.getPid());
|
123 |
43783
|
sandro.lab
|
return false;
|
124 |
|
|
}
|
125 |
|
|
|
126 |
|
|
public void setPluginResolver(final List<PIDResolver> pluginResolver) {
|
127 |
|
|
this.pluginResolver = pluginResolver;
|
128 |
|
|
}
|
129 |
|
|
|
130 |
|
|
@Override
|
131 |
|
|
public Boolean call() throws Exception {
|
132 |
46475
|
sandro.lab
|
|
133 |
|
|
log.info("START HERE!");
|
134 |
|
|
|
135 |
43783
|
sandro.lab
|
DBObject currentObject = inputQueue.take();
|
136 |
44283
|
sandro.lab
|
int i = 0;
|
137 |
45917
|
sandro.lab
|
String currentRecord = null;
|
138 |
46216
|
sandro.lab
|
double sumTotal = 0;
|
139 |
45917
|
sandro.lab
|
while (currentObject != ResolverMDStorePlugin.DONE) {
|
140 |
43783
|
sandro.lab
|
try {
|
141 |
45917
|
sandro.lab
|
currentRecord = (String) currentObject.get("body");
|
142 |
|
|
if (currentObject.get("resolved_ts") == null) {
|
143 |
46216
|
sandro.lab
|
final double start = System.currentTimeMillis();
|
144 |
45917
|
sandro.lab
|
final String resolvedRecord = resolve(currentRecord);
|
145 |
|
|
if (resolvedRecord != null) {
|
146 |
|
|
currentObject.put("body", resolvedRecord);
|
147 |
|
|
currentObject.removeField("_id");
|
148 |
|
|
currentObject.put("resolved_ts", timestamp);
|
149 |
|
|
outputCollection.insertOne(currentObject);
|
150 |
|
|
}
|
151 |
46216
|
sandro.lab
|
final double total = System.currentTimeMillis() - start;
|
152 |
|
|
sumTotal += total;
|
153 |
|
|
|
154 |
45917
|
sandro.lab
|
}
|
155 |
46475
|
sandro.lab
|
|
156 |
43783
|
sandro.lab
|
currentObject = inputQueue.take();
|
157 |
46216
|
sandro.lab
|
if (i++ % 100 == 0) {
|
158 |
46475
|
sandro.lab
|
log.debug(Thread.currentThread().getId() + " total object resolved: " + i + "in average time " + (sumTotal / 100) + "ms");
|
159 |
46216
|
sandro.lab
|
sumTotal = 0;
|
160 |
|
|
}
|
161 |
|
|
} catch (Throwable e) {
|
162 |
45917
|
sandro.lab
|
log.error("Error on resolving objects " + currentRecord, e);
|
163 |
|
|
return false;
|
164 |
43783
|
sandro.lab
|
}
|
165 |
|
|
}
|
166 |
|
|
if (currentObject == ResolverMDStorePlugin.DONE) {
|
167 |
|
|
inputQueue.put(currentObject);
|
168 |
|
|
}
|
169 |
|
|
return true;
|
170 |
|
|
|
171 |
|
|
}
|
172 |
|
|
|
173 |
|
|
public BlockingQueue<DBObject> getInputQueue() {
|
174 |
|
|
return inputQueue;
|
175 |
|
|
}
|
176 |
|
|
|
177 |
|
|
public void setInputQueue(final BlockingQueue<DBObject> inputQueue) {
|
178 |
|
|
this.inputQueue = inputQueue;
|
179 |
|
|
}
|
180 |
|
|
|
181 |
|
|
public MongoCollection<DBObject> getOutputCollection() {
|
182 |
|
|
return outputCollection;
|
183 |
|
|
}
|
184 |
|
|
|
185 |
|
|
public void setOutputCollection(final MongoCollection<DBObject> outputCollection) {
|
186 |
|
|
this.outputCollection = outputCollection;
|
187 |
|
|
}
|
188 |
44050
|
sandro.lab
|
|
189 |
|
|
public void setSerializer(final ResolverSerializer serializer) {
|
190 |
|
|
final ResolverSerializer tmp = new ResolverSerializer();
|
191 |
|
|
tmp.setPmfTemplate(new StringTemplate(serializer.getPmfTemplate().getTemplate()));
|
192 |
|
|
tmp.setDmfTemplate(new StringTemplate(serializer.getDmfTemplate().getTemplate()));
|
193 |
|
|
tmp.setScholixTemplate(new StringTemplate(serializer.getScholixTemplate().getTemplate()));
|
194 |
|
|
this.serializer = tmp;
|
195 |
|
|
}
|
196 |
43783
|
sandro.lab
|
}
|