1
|
package eu.dnetlib.msro.workflows.nodes;
|
2
|
|
3
|
import com.googlecode.protobuf.format.JsonFormat;
|
4
|
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
|
5
|
|
6
|
import eu.dnetlib.dli.resolver.model.*;
|
7
|
import eu.dnetlib.enabling.resultset.ResultSetInfo;
|
8
|
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
|
9
|
import eu.dnetlib.msro.workflows.graph.Arc;
|
10
|
import eu.dnetlib.msro.workflows.procs.Env;
|
11
|
import eu.dnetlib.msro.workflows.procs.Token;
|
12
|
import eu.dnetlib.msro.workflows.util.ProgressProvider;
|
13
|
import eu.dnetlib.pid.resolver.PIDResolver;
|
14
|
import eu.dnetlib.pid.resolver.model.ObjectProvenance;
|
15
|
import eu.dnetlib.pid.resolver.model.ObjectRelation;
|
16
|
import eu.dnetlib.pid.resolver.model.PID;
|
17
|
import eu.dnetlib.resolver.parser.DMFResolverParser;
|
18
|
import eu.dnetlib.rmi.common.ResultSet;
|
19
|
import org.apache.commons.lang3.StringUtils;
|
20
|
import org.apache.commons.logging.Log;
|
21
|
import org.apache.commons.logging.LogFactory;
|
22
|
import org.springframework.beans.factory.annotation.Autowired;
|
23
|
import org.springframework.http.converter.StringHttpMessageConverter;
|
24
|
import org.springframework.web.client.RestTemplate;
|
25
|
|
26
|
import java.nio.charset.Charset;
|
27
|
import java.time.LocalDateTime;
|
28
|
import java.util.Arrays;
|
29
|
import java.util.List;
|
30
|
|
31
|
import static eu.dnetlib.data.proto.dli.ScholixObjectProtos.*;
|
32
|
|
33
|
public class ResolveAndIndexJobNode extends SimpleJobNode implements ProgressProvider {
|
34
|
|
35
|
private static final Log log = LogFactory.getLog(ResolveAndIndexJobNode.class);
|
36
|
|
37
|
|
38
|
private static final String BASE_CFG_URL = "http://%s:9200/%s/scholix/%s/?pretty";
|
39
|
|
40
|
|
41
|
private String inputEprParam;
|
42
|
|
43
|
private String indexHost;
|
44
|
|
45
|
private String indexName;
|
46
|
|
47
|
private int counter;
|
48
|
|
49
|
private int total;
|
50
|
|
51
|
|
52
|
@Autowired
|
53
|
private List<PIDResolver> pluginResolver;
|
54
|
|
55
|
@Autowired
|
56
|
private ResultSetClient resultSetClient;
|
57
|
|
58
|
@Override
|
59
|
protected String execute(Env env) throws Exception {
|
60
|
|
61
|
final ResultSet<?> rsIn = env.getAttribute(this.inputEprParam, ResultSet.class);
|
62
|
|
63
|
|
64
|
final Iterable<String> records = resultSetClient.iter(rsIn, String.class);
|
65
|
|
66
|
ResultSetInfo info = resultSetClient.info(rsIn);
|
67
|
this.total = info.getTotal();
|
68
|
|
69
|
final RestTemplate restTemplate = new RestTemplate();
|
70
|
restTemplate.getMessageConverters()
|
71
|
.add(0, new StringHttpMessageConverter(Charset.forName("UTF-8")));
|
72
|
|
73
|
setIndexHost(indexHost);
|
74
|
setIndexName(indexName);
|
75
|
|
76
|
DMFResolverParser parser = new DMFResolverParser();
|
77
|
|
78
|
|
79
|
for (String record : records) {
|
80
|
this.counter++;
|
81
|
final DLIResolvedObject result = parser.parseObject(record);
|
82
|
if (result == null) {
|
83
|
log.error("error on parsing " + record);
|
84
|
continue;
|
85
|
}
|
86
|
for (final ObjectRelation rels : result.getRelations()) {
|
87
|
final DLIResolvedObject resolvedRelation = resolveRelation(rels.getTargetPID(), result.getDatasourceProvenance().get(0));
|
88
|
final Scholix.Builder scholix = Scholix.newBuilder();
|
89
|
|
90
|
scholix.addLinkproviderBuilder()
|
91
|
.setName(result.getDatasourceProvenance().get(0).getDatasource())
|
92
|
.addIdentifiersBuilder()
|
93
|
.setIdentifier(result.getDatasourceProvenance().get(0).getDatasourceId())
|
94
|
.setSchema("dnetIdentifier");
|
95
|
|
96
|
scholix.setRelationship(ScholixRelationship.newBuilder()
|
97
|
.setName(rels.getRelationSemantics())
|
98
|
.setInverse(rels.getInverseRelation())
|
99
|
.setSchema("datacite")
|
100
|
.build());
|
101
|
|
102
|
final ScholixResource source = generateResource(result);
|
103
|
final ScholixResource target = generateResource(resolvedRelation);
|
104
|
scholix.setSource(source);
|
105
|
scholix.setTarget(target);
|
106
|
scholix.setPublicationDate(LocalDateTime.now().toString());
|
107
|
restTemplate.postForLocation(String.format(BASE_CFG_URL, indexHost, indexName, generateIdentifier(result, resolvedRelation)), JsonFormat.printToString(scholix.build()));
|
108
|
|
109
|
scholix.setRelationship(ScholixRelationship.newBuilder()
|
110
|
.setInverse(rels.getRelationSemantics())
|
111
|
.setName(rels.getInverseRelation())
|
112
|
.setSchema("datacite")
|
113
|
.build());
|
114
|
scholix.setTarget(source);
|
115
|
scholix.setSource(target);
|
116
|
|
117
|
restTemplate.postForLocation(String.format(BASE_CFG_URL, indexHost, indexName, generateIdentifier(resolvedRelation, result)), JsonFormat.printToString(scholix.build()));
|
118
|
}
|
119
|
}
|
120
|
return Arc.DEFAULT_ARC;
|
121
|
}
|
122
|
|
123
|
@Override
|
124
|
protected void beforeStart(final Token token) {
|
125
|
token.setProgressProvider(this);
|
126
|
}
|
127
|
|
128
|
|
129
|
private String generateIdentifier(final String source, final String target) {
|
130
|
return AbstractDNetXsltFunctions.md5(String.format("%s::%s", source.toLowerCase().trim(), target.toLowerCase().trim()));
|
131
|
|
132
|
}
|
133
|
|
134
|
private String generateIdentifier(final DLIResolvedObject source, DLIResolvedObject target) {
|
135
|
|
136
|
return AbstractDNetXsltFunctions.md5(String.format("%s::%s", source.getPid().toLowerCase().trim(), target.getPid().toLowerCase().trim()));
|
137
|
|
138
|
}
|
139
|
|
140
|
private ScholixResource generateResource(DLIResolvedObject result) {
|
141
|
final ScholixResource.Builder builder = ScholixResource.newBuilder();
|
142
|
if (result.getDatasourceProvenance() != null)
|
143
|
result.getDatasourceProvenance().forEach(
|
144
|
objectProvenance -> {
|
145
|
builder.addCollectedFrom(ScholixCollectedFrom.newBuilder()
|
146
|
.setProvisionMode(((DLIObjectProvenance) objectProvenance).getProvisionMode())
|
147
|
.setCompletionStatus(((DLIObjectProvenance) objectProvenance).getCompletionStatus())
|
148
|
.setProvider(ScholixEntityId.newBuilder()
|
149
|
.setName(objectProvenance.getDatasource())
|
150
|
.addIdentifiers(ScholixIdentifier.newBuilder().setIdentifier(objectProvenance.getDatasourceId())
|
151
|
.setSchema("dnetIdentifier").build())
|
152
|
.build()));
|
153
|
if (StringUtils.isNotEmpty(((DLIObjectProvenance) objectProvenance).getPublisher())) {
|
154
|
builder.addPublisher(ScholixEntityId.newBuilder()
|
155
|
.setName(((DLIObjectProvenance) objectProvenance).getPublisher())
|
156
|
.build());
|
157
|
}
|
158
|
|
159
|
});
|
160
|
builder.addIdentifier(ScholixIdentifier.newBuilder().
|
161
|
setIdentifier(result.getPid())
|
162
|
.setSchema(result.getPidType())
|
163
|
.build());
|
164
|
builder.setObjectType(result.getType().toString());
|
165
|
if (result.getTitles() != null && result.getTitles().size() > 0)
|
166
|
builder.setTitle(result.getTitles().get(0));
|
167
|
if (result.getAuthors() != null)
|
168
|
result.getAuthors().forEach(author -> builder.addCreator(
|
169
|
ScholixEntityId.newBuilder()
|
170
|
.setName(author)
|
171
|
.build()));
|
172
|
if (StringUtils.isNotBlank(result.getDate())) {
|
173
|
builder.setPublicationDate(result.getDate());
|
174
|
}
|
175
|
|
176
|
String tp = null;
|
177
|
|
178
|
switch (result.getType()) {
|
179
|
case dataset:
|
180
|
tp = "60";
|
181
|
break;
|
182
|
case unknown:
|
183
|
tp = "70";
|
184
|
break;
|
185
|
case publication:
|
186
|
tp = "50";
|
187
|
break;
|
188
|
}
|
189
|
builder.setDnetIdentifier(tp + "|dnet________::" + result.getIdentifier());
|
190
|
return builder.build();
|
191
|
}
|
192
|
|
193
|
|
194
|
private DLIResolvedObject resolveRelation(final PID currentPid, final ObjectProvenance provenance) {
|
195
|
for (PIDResolver resolver : pluginResolver) {
|
196
|
final DLIResolvedObject currentIdentifier = (DLIResolvedObject) resolver.retrievePID(currentPid.getId(), currentPid.getType(), false);
|
197
|
|
198
|
if (currentIdentifier != null &&
|
199
|
!StringUtils.isBlank(currentIdentifier.getPid()) &&
|
200
|
currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
|
201
|
return currentIdentifier;
|
202
|
}
|
203
|
}
|
204
|
|
205
|
final DLIResolvedObject resolvedObject = new DLIResolvedObject();
|
206
|
resolvedObject.setPid(currentPid.getId());
|
207
|
resolvedObject.setPidType(currentPid.getType());
|
208
|
DLIObjectProvenance resultProvenance = new DLIObjectProvenance();
|
209
|
resultProvenance.setDatasource(provenance.getDatasource());
|
210
|
resultProvenance.setDatasourceId(provenance.getDatasourceId());
|
211
|
resultProvenance.setCompletionStatus(CompletionStatus.incomplete.toString());
|
212
|
resultProvenance.setProvisionMode(ObjectProvisionMode.collected.toString());
|
213
|
resolvedObject.setDatasourceProvenance(Arrays.asList(resultProvenance));
|
214
|
return resolvedObject;
|
215
|
}
|
216
|
|
217
|
public String getInputEprParam() {
|
218
|
return inputEprParam;
|
219
|
}
|
220
|
|
221
|
public void setInputEprParam(String inputEprParam) {
|
222
|
this.inputEprParam = inputEprParam;
|
223
|
}
|
224
|
|
225
|
public String getIndexHost() {
|
226
|
return indexHost;
|
227
|
}
|
228
|
|
229
|
public void setIndexHost(String indexHost) {
|
230
|
this.indexHost = indexHost;
|
231
|
}
|
232
|
|
233
|
public String getIndexName() {
|
234
|
return indexName;
|
235
|
}
|
236
|
|
237
|
public void setIndexName(String indexName) {
|
238
|
this.indexName = indexName;
|
239
|
}
|
240
|
|
241
|
@Override
|
242
|
public String getProgressDescription() {
|
243
|
return this.counter < 0 ? "-" : String.format("%d / %d", this.counter, this.total);
|
244
|
}
|
245
|
}
|