Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import com.googlecode.protobuf.format.JsonFormat;
4
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
5
import eu.dnetlib.dli.resolver.PIDResolver;
6
import eu.dnetlib.dli.resolver.model.*;
7
import eu.dnetlib.enabling.resultset.ResultSetInfo;
8
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
9
import eu.dnetlib.msro.workflows.graph.Arc;
10
import eu.dnetlib.msro.workflows.procs.Env;
11
import eu.dnetlib.msro.workflows.procs.Token;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.resolver.parser.DMFResolverParser;
14
import eu.dnetlib.rmi.common.ResultSet;
15
import org.apache.commons.lang3.StringUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.springframework.beans.factory.annotation.Autowired;
19
import org.springframework.http.converter.StringHttpMessageConverter;
20
import org.springframework.web.client.RestTemplate;
21

    
22
import java.nio.charset.Charset;
23
import java.time.LocalDateTime;
24
import java.util.Arrays;
25
import java.util.List;
26

    
27
import static eu.dnetlib.data.proto.dli.ScholixObjectProtos.*;
28

    
29
public class ResolveAndIndexJobNode extends SimpleJobNode implements ProgressProvider {
30

    
31
    private static final Log log = LogFactory.getLog(ResolveAndIndexJobNode.class);
32

    
33

    
34
    private static final String BASE_CFG_URL = "http://%s:9200/%s/scholix/%s/?pretty";
35

    
36

    
37
    private String inputEprParam;
38

    
39
    private String indexHost;
40

    
41
    private String indexName;
42

    
43
    private int counter;
44

    
45
    private int total;
46

    
47

    
48
    @Autowired
49
    private List<PIDResolver> pluginResolver;
50

    
51
    @Autowired
52
    private ResultSetClient resultSetClient;
53

    
54
    @Override
55
    protected String execute(Env env) throws Exception {
56

    
57
        final ResultSet<?> rsIn = env.getAttribute(this.inputEprParam, ResultSet.class);
58

    
59

    
60
        final Iterable<String> records = resultSetClient.iter(rsIn, String.class);
61

    
62
        ResultSetInfo info = resultSetClient.info(rsIn);
63
        this.total = info.getTotal();
64

    
65
        final RestTemplate restTemplate = new RestTemplate();
66
        restTemplate.getMessageConverters()
67
                .add(0, new StringHttpMessageConverter(Charset.forName("UTF-8")));
68

    
69
        setIndexHost(indexHost);
70
        setIndexName(indexName);
71

    
72
        DMFResolverParser parser = new DMFResolverParser();
73

    
74

    
75
        for (String record : records) {
76
            this.counter++;
77
            final ResolvedObject result = parser.parseObject(record);
78
            if (result == null) {
79
                log.error("error on parsing " + record);
80
                continue;
81
            }
82
            for (final ObjectRelation rels : result.getRelations()) {
83
                final ResolvedObject resolvedRelation = resolveRelation(rels.getTargetPID(), result.getDatasourceProvenance().get(0));
84
                final Scholix.Builder scholix = Scholix.newBuilder();
85

    
86
                scholix.addLinkproviderBuilder()
87
                        .setName(result.getDatasourceProvenance().get(0).getDatasource())
88
                        .addIdentifiersBuilder()
89
                        .setIdentifier(result.getDatasourceProvenance().get(0).getDatasourceId())
90
                        .setSchema("dnetIdentifier");
91

    
92
                scholix.setRelationship(ScholixRelationship.newBuilder()
93
                        .setName(rels.getRelationSemantics())
94
                        .setInverse(rels.getInverseRelation())
95
                        .setSchema("datacite")
96
                        .build());
97

    
98
                final ScholixResource source = generateResource(result);
99
                final ScholixResource target = generateResource(resolvedRelation);
100
                scholix.setSource(source);
101
                scholix.setTarget(target);
102
                scholix.setPublicationDate(LocalDateTime.now().toString());
103
                restTemplate.postForLocation(String.format(BASE_CFG_URL, indexHost, indexName, generateIdentifier(result, resolvedRelation)), JsonFormat.printToString(scholix.build()));
104

    
105
                scholix.setRelationship(ScholixRelationship.newBuilder()
106
                        .setInverse(rels.getRelationSemantics())
107
                        .setName(rels.getInverseRelation())
108
                        .setSchema("datacite")
109
                        .build());
110
                scholix.setTarget(source);
111
                scholix.setSource(target);
112

    
113
                restTemplate.postForLocation(String.format(BASE_CFG_URL, indexHost, indexName, generateIdentifier(resolvedRelation, result)), JsonFormat.printToString(scholix.build()));
114
            }
115
        }
116
        return Arc.DEFAULT_ARC;
117
    }
118

    
119
    @Override
120
    protected void beforeStart(final Token token) {
121
        token.setProgressProvider(this);
122
    }
123

    
124

    
125
    private String generateIdentifier(final String source, final String target) {
126
        return AbstractDNetXsltFunctions.md5(String.format("%s::%s", source.toLowerCase().trim(), target.toLowerCase().trim()));
127

    
128
    }
129

    
130
    private String generateIdentifier(final ResolvedObject source, ResolvedObject target) {
131

    
132
        return AbstractDNetXsltFunctions.md5(String.format("%s::%s", source.getPid().toLowerCase().trim(), target.getPid().toLowerCase().trim()));
133

    
134
    }
135

    
136
    private ScholixResource generateResource(ResolvedObject result) {
137
        final ScholixResource.Builder builder = ScholixResource.newBuilder();
138
        if (result.getDatasourceProvenance() != null)
139
            result.getDatasourceProvenance().forEach(
140
                    objectProvenance -> {
141
                        builder.addCollectedFrom(ScholixCollectedFrom.newBuilder()
142
                                .setProvisionMode(objectProvenance.getProvisionMode())
143
                                .setCompletionStatus(objectProvenance.getCompletionStatus())
144
                                .setProvider(ScholixEntityId.newBuilder()
145
                                        .setName(objectProvenance.getDatasource())
146
                                        .addIdentifiers(ScholixIdentifier.newBuilder().setIdentifier(objectProvenance.getDatasourceId())
147
                                                .setSchema("dnetIdentifier").build())
148
                                        .build()));
149
                        if (StringUtils.isNotEmpty(objectProvenance.getPublisher())) {
150
                            builder.addPublisher(ScholixEntityId.newBuilder()
151
                                    .setName(objectProvenance.getPublisher())
152
                                    .build());
153
                        }
154

    
155
                    });
156
        builder.addIdentifier(ScholixIdentifier.newBuilder().
157
                setIdentifier(result.getPid())
158
                .setSchema(result.getPidType())
159
                .build());
160
        builder.setObjectType(result.getType().toString());
161
        if (result.getTitles() != null && result.getTitles().size() > 0)
162
            builder.setTitle(result.getTitles().get(0));
163
        if (result.getAuthors() != null)
164
            result.getAuthors().forEach(author -> builder.addCreator(
165
                    ScholixEntityId.newBuilder()
166
                            .setName(author)
167
                            .build()));
168
        if (StringUtils.isNotBlank(result.getDate())) {
169
            builder.setPublicationDate(result.getDate());
170
        }
171

    
172
        String tp = null;
173

    
174
        switch (result.getType()) {
175
            case dataset:
176
                tp = "60";
177
                break;
178
            case unknown:
179
                tp = "70";
180
                break;
181
            case publication:
182
                tp = "50";
183
                break;
184
        }
185
        builder.setDnetIdentifier(tp + "|dnet________::" + result.getIdentifier());
186
        return builder.build();
187
    }
188

    
189

    
190
    private ResolvedObject resolveRelation(final PID currentPid, final ObjectProvenance provenance) {
191
        for (PIDResolver resolver : pluginResolver) {
192
            final ResolvedObject currentIdentifier = resolver.retrievePID(currentPid.getId(), currentPid.getType());
193

    
194
            if (currentIdentifier != null &&
195
                    !StringUtils.isBlank(currentIdentifier.getPid()) &&
196
                    currentIdentifier.getPid().toLowerCase().equals(currentPid.getId().toLowerCase())) {
197
                return currentIdentifier;
198
            }
199
        }
200

    
201
        final ResolvedObject resolvedObject = new ResolvedObject();
202
        resolvedObject.setPid(currentPid.getId());
203
        resolvedObject.setPidType(currentPid.getType());
204
        ObjectProvenance resultProvenance = new ObjectProvenance();
205
        resultProvenance.setDatasource(provenance.getDatasource());
206
        resultProvenance.setDatasourceId(provenance.getDatasourceId());
207
        resultProvenance.setCompletionStatus(CompletionStatus.incomplete.toString());
208
        resultProvenance.setProvisionMode(ObjectProvisionMode.collected.toString());
209
        resolvedObject.setDatasourceProvenance(Arrays.asList(resultProvenance));
210
        return resolvedObject;
211
    }
212

    
213
    public String getInputEprParam() {
214
        return inputEprParam;
215
    }
216

    
217
    public void setInputEprParam(String inputEprParam) {
218
        this.inputEprParam = inputEprParam;
219
    }
220

    
221
    public String getIndexHost() {
222
        return indexHost;
223
    }
224

    
225
    public void setIndexHost(String indexHost) {
226
        this.indexHost = indexHost;
227
    }
228

    
229
    public String getIndexName() {
230
        return indexName;
231
    }
232

    
233
    public void setIndexName(String indexName) {
234
        this.indexName = indexName;
235
    }
236

    
237
    @Override
238
    public String getProgressDescription() {
239
        return this.counter < 0 ? "-" : String.format("%d / %d", this.counter, this.total);
240
    }
241
}
(5-5/5)