Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import com.googlecode.protobuf.format.JsonFormat;
4
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
5

    
6
import eu.dnetlib.dli.resolver.model.*;
7
import eu.dnetlib.enabling.resultset.ResultSetInfo;
8
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
9
import eu.dnetlib.msro.workflows.graph.Arc;
10
import eu.dnetlib.msro.workflows.procs.Env;
11
import eu.dnetlib.msro.workflows.procs.Token;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.pid.resolver.PIDResolver;
14
import eu.dnetlib.pid.resolver.model.ObjectProvenance;
15
import eu.dnetlib.resolver.parser.DMFResolverParser;
16
import eu.dnetlib.rmi.common.ResultSet;
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.springframework.beans.factory.annotation.Autowired;
21
import org.springframework.http.converter.StringHttpMessageConverter;
22
import org.springframework.web.client.RestTemplate;
23

    
24
import java.nio.charset.Charset;
25
import java.time.LocalDateTime;
26
import java.util.Arrays;
27
import java.util.List;
28

    
29
import static eu.dnetlib.data.proto.dli.ScholixObjectProtos.*;
30

    
31
public class ResolveAndIndexJobNode extends SimpleJobNode implements ProgressProvider {
32

    
33
    private static final Log log = LogFactory.getLog(ResolveAndIndexJobNode.class);
34

    
35

    
36
    private static final String BASE_CFG_URL = "http://%s:9200/%s/scholix/%s/?pretty";
37

    
38

    
39
    private String inputEprParam;
40

    
41
    private String indexHost;
42

    
43
    private String indexName;
44

    
45
    private int counter;
46

    
47
    private int total;
48

    
49

    
50
    @Autowired
51
    private List<PIDResolver> pluginResolver;
52

    
53
    @Autowired
54
    private ResultSetClient resultSetClient;
55

    
56
    @Override
57
    protected String execute(Env env) throws Exception {
58

    
59
        final ResultSet<?> rsIn = env.getAttribute(this.inputEprParam, ResultSet.class);
60

    
61

    
62
        final Iterable<String> records = resultSetClient.iter(rsIn, String.class);
63

    
64
        ResultSetInfo info = resultSetClient.info(rsIn);
65
        this.total = info.getTotal();
66

    
67
        final RestTemplate restTemplate = new RestTemplate();
68
        restTemplate.getMessageConverters()
69
                .add(0, new StringHttpMessageConverter(Charset.forName("UTF-8")));
70

    
71
        setIndexHost(indexHost);
72
        setIndexName(indexName);
73

    
74
        DMFResolverParser parser = new DMFResolverParser();
75

    
76

    
77
        for (String record : records) {
78
            this.counter++;
79
            final DLIResolvedObject result = parser.parseObject(record);
80
            if (result == null) {
81
                log.error("error on parsing " + record);
82
                continue;
83
            }
84
            for (final DLIObjectRelation rels : result.getRelations()) {
85
                final DLIResolvedObject resolvedRelation = resolveRelation(rels.getTargetPID(), result.getDatasourceProvenance().get(0));
86
                final Scholix.Builder scholix = Scholix.newBuilder();
87

    
88
                scholix.addLinkproviderBuilder()
89
                        .setName(result.getDatasourceProvenance().get(0).getDatasource())
90
                        .addIdentifiersBuilder()
91
                        .setIdentifier(result.getDatasourceProvenance().get(0).getDatasourceId())
92
                        .setSchema("dnetIdentifier");
93

    
94
                scholix.setRelationship(ScholixRelationship.newBuilder()
95
                        .setName(rels.getRelationSemantics())
96
                        .setInverse(rels.getInverseRelation())
97
                        .setSchema("datacite")
98
                        .build());
99

    
100
                final ScholixResource source = generateResource(result);
101
                final ScholixResource target = generateResource(resolvedRelation);
102
                scholix.setSource(source);
103
                scholix.setTarget(target);
104
                scholix.setPublicationDate(LocalDateTime.now().toString());
105
                restTemplate.postForLocation(String.format(BASE_CFG_URL, indexHost, indexName, generateIdentifier(result, resolvedRelation)), JsonFormat.printToString(scholix.build()));
106

    
107
                scholix.setRelationship(ScholixRelationship.newBuilder()
108
                        .setInverse(rels.getRelationSemantics())
109
                        .setName(rels.getInverseRelation())
110
                        .setSchema("datacite")
111
                        .build());
112
                scholix.setTarget(source);
113
                scholix.setSource(target);
114

    
115
                restTemplate.postForLocation(String.format(BASE_CFG_URL, indexHost, indexName, generateIdentifier(resolvedRelation, result)), JsonFormat.printToString(scholix.build()));
116
            }
117
        }
118
        return Arc.DEFAULT_ARC;
119
    }
120

    
121
    @Override
122
    protected void beforeStart(final Token token) {
123
        token.setProgressProvider(this);
124
    }
125

    
126

    
127
    private String generateIdentifier(final String source, final String target) {
128
        return AbstractDNetXsltFunctions.md5(String.format("%s::%s", source.toLowerCase().trim(), target.toLowerCase().trim()));
129

    
130
    }
131

    
132
    private String generateIdentifier(final DLIResolvedObject source, DLIResolvedObject target) {
133

    
134
        return AbstractDNetXsltFunctions.md5(String.format("%s::%s", source.getPid().toLowerCase().trim(), target.getPid().toLowerCase().trim()));
135

    
136
    }
137

    
138
    private ScholixResource generateResource(DLIResolvedObject result) {
139
        final ScholixResource.Builder builder = ScholixResource.newBuilder();
140
        if (result.getDatasourceProvenance() != null)
141
            result.getDatasourceProvenance().forEach(
142
                    objectProvenance -> {
143
                        builder.addCollectedFrom(ScholixCollectedFrom.newBuilder()
144
                                .setProvisionMode(objectProvenance.getProvisionMode())
145
                                .setCompletionStatus(objectProvenance.getCompletionStatus())
146
                                .setProvider(ScholixEntityId.newBuilder()
147
                                        .setName(objectProvenance.getDatasource())
148
                                        .addIdentifiers(ScholixIdentifier.newBuilder().setIdentifier(objectProvenance.getDatasourceId())
149
                                                .setSchema("dnetIdentifier").build())
150
                                        .build()));
151
                        if (StringUtils.isNotEmpty(objectProvenance.getPublisher())) {
152
                            builder.addPublisher(ScholixEntityId.newBuilder()
153
                                    .setName(objectProvenance.getPublisher())
154
                                    .build());
155
                        }
156

    
157
                    });
158
        builder.addIdentifier(ScholixIdentifier.newBuilder().
159
                setIdentifier(result.getPid())
160
                .setSchema(result.getPidType())
161
                .build());
162
        builder.setObjectType(result.getType().toString());
163
        if (result.getTitles() != null && result.getTitles().size() > 0)
164
            builder.setTitle(result.getTitles().get(0));
165
        if (result.getAuthors() != null)
166
            result.getAuthors().forEach(author -> builder.addCreator(
167
                    ScholixEntityId.newBuilder()
168
                            .setName(author)
169
                            .build()));
170
        if (StringUtils.isNotBlank(result.getDate())) {
171
            builder.setPublicationDate(result.getDate());
172
        }
173

    
174
        String tp = null;
175

    
176
        switch (result.getType()) {
177
            case dataset:
178
                tp = "60";
179
                break;
180
            case unknown:
181
                tp = "70";
182
                break;
183
            case publication:
184
                tp = "50";
185
                break;
186
        }
187
        builder.setDnetIdentifier(tp + "|dnet________::" + result.getIdentifier());
188
        return builder.build();
189
    }
190

    
191

    
192
    private DLIResolvedObject resolveRelation(final PID currentPid, final ObjectProvenance provenance) {
193
        for (PIDResolver resolver : pluginResolver) {
194
            final DLIResolvedObject currentIdentifier = (DLIResolvedObject) resolver.retrievePID(currentPid.getId(), currentPid.getType());
195

    
196
            if (currentIdentifier != null &&
197
                    !StringUtils.isBlank(currentIdentifier.getPid()) &&
198
                    currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
199
                return currentIdentifier;
200
            }
201
        }
202

    
203
        final DLIResolvedObject resolvedObject = new DLIResolvedObject();
204
        resolvedObject.setPid(currentPid.getId());
205
        resolvedObject.setPidType(currentPid.getType());
206
        DLIObjectProvenance resultProvenance = new DLIObjectProvenance();
207
        resultProvenance.setDatasource(provenance.getDatasource());
208
        resultProvenance.setDatasourceId(provenance.getDatasourceId());
209
        resultProvenance.setCompletionStatus(CompletionStatus.incomplete.toString());
210
        resultProvenance.setProvisionMode(ObjectProvisionMode.collected.toString());
211
        resolvedObject.setDatasourceProvenance(Arrays.asList(resultProvenance));
212
        return resolvedObject;
213
    }
214

    
215
    public String getInputEprParam() {
216
        return inputEprParam;
217
    }
218

    
219
    public void setInputEprParam(String inputEprParam) {
220
        this.inputEprParam = inputEprParam;
221
    }
222

    
223
    public String getIndexHost() {
224
        return indexHost;
225
    }
226

    
227
    public void setIndexHost(String indexHost) {
228
        this.indexHost = indexHost;
229
    }
230

    
231
    public String getIndexName() {
232
        return indexName;
233
    }
234

    
235
    public void setIndexName(String indexName) {
236
        this.indexName = indexName;
237
    }
238

    
239
    @Override
240
    public String getProgressDescription() {
241
        return this.counter < 0 ? "-" : String.format("%d / %d", this.counter, this.total);
242
    }
243
}
(5-5/5)