Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import com.google.gson.Gson;
4
import com.google.gson.JsonElement;
5
import com.google.gson.JsonObject;
6
import com.googlecode.protobuf.format.JsonFormat;
7
import eu.dnetlib.actionmanager.actions.ActionFactory;
8
import eu.dnetlib.actionmanager.actions.AtomicAction;
9
import eu.dnetlib.actionmanager.common.Agent;
10
import eu.dnetlib.data.mapreduce.util.StreamUtils;
11
import eu.dnetlib.data.proto.*;
12
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
13
import eu.dnetlib.miscutils.collections.Pair;
14
import org.apache.commons.io.IOUtils;
15
import org.apache.commons.lang3.StringUtils;
16
import java.io.IOException;
17
import java.io.InputStream;
18
import java.util.*;
19
import java.util.concurrent.atomic.AtomicInteger;
20
import java.util.stream.Collectors;
21

    
22
import static eu.dnetlib.data.mapreduce.hbase.dataimport.DumpToActionsUtility.*;
23
import static eu.dnetlib.data.proto.ResultOrganizationProtos.ResultOrganization;
24

    
25

    
26
public class DOIBoostToActions {
27

    
28
    private static Map<String, Pair<String, String>> datasources =  new HashMap<String, Pair<String, String>>() {{
29
        put("MAG", new Pair<>("Microsoft Academic Graph", "openaire____::microsoft"));
30
        put("ORCID", new Pair<>("ORCID", "openaire____::orcid"));
31
        put("CrossRef", new Pair<>("Crossref", "openaire____::crossref"));
32
        put("UnpayWall", new Pair<>("UnpayWall", "openaire____::unpaywall"));
33

    
34
    }};
35

    
36
    private static Map<String, FieldTypeProtos.Qualifier> affiliationPIDType =  new HashMap<String, FieldTypeProtos.Qualifier>() {{
37
        put("MAG", FieldTypeProtos.Qualifier.newBuilder().setClassid("mag_id" ).setClassname("Microsoft Academic Graph Identifier").setSchemename("dnet:pid_types").setSchemeid("dnet:pid_types").build());
38
        put("grid.ac", getQualifier("grid", "dnet:pid_types"));
39
        put("wikpedia", getQualifier("urn", "dnet:pid_types"));
40
    }};
41

    
42
    static Map<String, Map<String, String>> typologiesMapping;
43

    
44
    static {
45
        try {
46
            final InputStream is = DOIBoostToActions.class.getResourceAsStream("/eu/dnetlib/data/mapreduce/hbase/dataimport/mapping_typologies.json");
47
            final String tt =IOUtils.toString(is);
48
            typologiesMapping = new Gson().fromJson(tt, Map.class);
49
        } catch (IOException e) {
50
            e.printStackTrace();
51
        }
52
    }
53

    
54
    final static String doiBoostNSPREFIX ="doiboost____";
55

    
56

    
57
    public static List<AtomicAction> generatePublicationActionsFromDump(final JsonObject rootElement, final ActionFactory factory, final String setName, final Agent agent, boolean invisible) {
58

    
59
        //Create OAF Proto
60

    
61
        final OafProtos.Oaf.Builder oaf = OafProtos.Oaf.newBuilder();
62
        //Add Data Info
63
        oaf.setDataInfo(FieldTypeProtos.DataInfo.newBuilder()
64
                .setInvisible(invisible)
65
                .setDeletedbyinference(false)
66
                .setInferred(false)
67
                .setTrust("0.9")
68
                .setProvenanceaction(getQualifier("sysimport:actionset", "dnet:provenanceActions"))
69
                .build());
70

    
71
        //Adding Kind
72
        oaf.setKind(KindProtos.Kind.entity);
73

    
74
        //creating Result Proto
75
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder().setType(TypeProtos.Type.result);
76

    
77
        entity.setDateofcollection("2018-10-10");
78

    
79
        if (rootElement.has("collectedFrom") && rootElement.get("collectedFrom").isJsonArray()){
80
            StreamUtils.toStream(rootElement.getAsJsonArray("collectedFrom").iterator())
81
                    .map(JsonElement::getAsString)
82
                    .forEach(cf ->
83
                            {
84
                                final String id =datasources.get(cf).getValue();
85
                                final String name =datasources.get(cf).getKey();
86
                                if (StringUtils.isNotBlank(id) && StringUtils.isNotBlank(name)) {
87
                                    final FieldTypeProtos.KeyValue collectedFrom = FieldTypeProtos.KeyValue.newBuilder()
88
                                            .setValue(name)
89
                                            .setKey("10|openaire____::" + AbstractDNetXsltFunctions.md5(StringUtils.substringAfter(id, "::")))
90
                                            .build();
91
                                    entity.addCollectedfrom(collectedFrom);
92
                                }
93
                            }
94
                    );
95
        }
96
        //Adding identifier
97
        final String doi = getStringValue(rootElement, "doi");
98
        if (doi == null)
99
            return null;
100
        final String sourceId = String.format("50|%s::%s", doiBoostNSPREFIX, AbstractDNetXsltFunctions.md5(doi));
101
        entity.setId(sourceId);
102

    
103
        entity.addPid(FieldTypeProtos.StructuredProperty.newBuilder()
104
                .setValue(doi)
105
                .setQualifier(getQualifier("doi", "dnet:pid_types"))
106
                .build());
107

    
108

    
109
        //Create Result Field
110
        ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder();
111

    
112
        final String type = getStringValue(rootElement,"type");
113

    
114
        if (!typologiesMapping.containsKey(type))
115
            return null;
116

    
117
        //Adding Instances
118
        final String typeValue = typologiesMapping.get(type).get("value");
119
        final String cobjValue = typologiesMapping.get(type).get("cobj");
120

    
121

    
122
        getArrayObjects(rootElement, "instances").stream().map(it ->
123
                {
124
                    ResultProtos.Result.Instance.Builder instance= ResultProtos.Result.Instance.newBuilder();
125
                    instance.setInstancetype(FieldTypeProtos.Qualifier.newBuilder()
126
                            .setClassid(cobjValue)
127
                            .setClassname(typeValue)
128
                            .setSchemeid("dnet:publication_resource")
129
                            .setSchemename("dnet:publication_resource")
130
                            .build());
131
                    instance.setHostedby(FieldTypeProtos.KeyValue.newBuilder()
132
                            .setKey("10|openaire____::55045bd2a65019fd8e6741a755395c8c")
133
                            .setValue("Unknown Repository")
134
                            .build());
135

    
136
                    final String acc_class_id =it.get("access-rights").getAsString();
137
                    String acc_class_value;
138
                    switch (acc_class_id){
139
                        case "OPEN": {
140
                            acc_class_value = "open access";
141
                            break;
142
                        }
143
                        case "CLOSED": {
144
                            acc_class_value = "closed access";
145
                            break;
146
                        }
147

    
148
                        default: {
149
                            acc_class_value = "not available";
150
                        }
151

    
152
                    }
153

    
154
                    instance.addUrl(it.get("url").getAsString());
155
                    instance.setAccessright(FieldTypeProtos.Qualifier.newBuilder()
156
                            .setClassid(acc_class_id)
157
                            .setClassname(acc_class_value)
158
                            .setSchemeid("dnet:access_modes")
159
                            .setSchemename("dnet:access_modes")
160
                            .build());
161

    
162
                    final String id =datasources.get(it.get("provenance").getAsString()).getValue();
163
                    final String name =datasources.get(it.get("provenance").getAsString()).getKey();
164
                    if (StringUtils.isNotBlank(id) && StringUtils.isNotBlank(name)) {
165
                        final FieldTypeProtos.KeyValue collectedFrom = FieldTypeProtos.KeyValue.newBuilder()
166
                                .setValue(name)
167
                                .setKey("10|openaire____::" + AbstractDNetXsltFunctions.md5(StringUtils.substringAfter(id, "::")))
168
                                .build();
169

    
170
                        instance.setCollectedfrom(collectedFrom);
171
                    }
172

    
173
                    return  instance.build();
174
                }).forEach(result::addInstance);
175

    
176
        //Adding DOI URL as  Instance
177
        final String doiURL = getStringValue(rootElement, "doi-url");
178
        if (StringUtils.isNotBlank(doiURL)) {
179

    
180

    
181
        final ResultProtos.Result.Instance.Builder instance = ResultProtos.Result.Instance.newBuilder();
182
        instance.addUrl(doiURL);
183
            instance.setAccessright(FieldTypeProtos.Qualifier.newBuilder()
184
                    .setClassid("CLOSED")
185
                    .setClassname("Closed Access")
186
                    .setSchemeid("dnet:access_modes")
187
                    .setSchemename("dnet:access_modes")
188
                    .build());
189
            instance.setCollectedfrom(FieldTypeProtos.KeyValue.newBuilder()
190
                    .setValue("CrossRef")
191
                    .setKey("10|openaire____::" + AbstractDNetXsltFunctions.md5("crossref"))
192
                    .build());
193
            result.addInstance(instance);
194
        }
195

    
196
        //Create Metadata Proto
197
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
198

    
199

    
200
        Pair<List<FieldTypeProtos.Author>, Collection<OafProtos.Oaf>> authorsOrganizations = createAuthorsOrganization(rootElement);
201

    
202
        if (authorsOrganizations.getKey().size() > 0) {
203
            metadata.addAllAuthor(authorsOrganizations.getKey());
204
        }
205
        //adding Language
206
        metadata.setLanguage(FieldTypeProtos.Qualifier.newBuilder()
207
                .setClassid("und")
208
                .setClassname("Undetermined")
209
                .setSchemeid("dent:languages")
210
                .setSchemename("dent:languages")
211
                .build());
212

    
213
        //Adding subjects
214
        List<String> subjects =getArrayValues(rootElement, "subject");
215

    
216
        subjects.forEach(s-> metadata.addSubject(FieldTypeProtos.StructuredProperty.newBuilder()
217
                .setValue(s)
218
                .setQualifier(getQualifier("keyword", "dnet:subject"))
219
                .build()));
220

    
221
        List<String>titles =getArrayValues(rootElement, "title");
222
        titles.forEach(t->
223
                metadata.addTitle(FieldTypeProtos.StructuredProperty.newBuilder()
224
                        .setValue(t)
225
                        .setQualifier(getQualifier("main title", "dnet:dataCite_title"))
226
                        .build()));
227
        settingRelevantDate(rootElement, metadata, "issued", "issued", true);
228
        settingRelevantDate(rootElement, metadata, "accepted", "accepted", false);
229
        settingRelevantDate(rootElement, metadata, "published-online", "published-online", false);
230
        settingRelevantDate(rootElement, metadata, "published-print", "published-print", false);
231

    
232

    
233
        getArrayObjects(rootElement, "abstract").forEach(d -> metadata.addDescription(FieldTypeProtos.StringField.newBuilder().setValue(d.get("value").getAsString()).build()));
234

    
235

    
236

    
237
        //Adding Journal
238
        final String publisher = getStringValue(rootElement,"publisher");
239
        if (StringUtils.isNotBlank(publisher)){
240

    
241
            final ResultProtos.Result.Journal.Builder journal = ResultProtos.Result.Journal.newBuilder().setName(publisher);
242

    
243
            if (hasJSONArrayField(rootElement,"issn" )){
244
                StreamUtils.toStream(rootElement.getAsJsonArray("issn").iterator())
245
                        .map(JsonElement::getAsJsonObject)
246
                        .forEach(it -> {
247
                            final String issntype = getStringValue(it, "type");
248
                            final String value = getStringValue(it, "value");
249
                            if("electronic".equals(issntype)){
250
                                journal.setIssnOnline(value);
251
                            }
252
                            if ("print".equals(issntype))
253
                                journal.setIssnPrinted(value);
254
                        });
255
            }
256
            metadata.setJournal(journal.build());
257
        }
258
        metadata.setResulttype(getQualifier(getDefaultResulttype(cobjValue), "dnet:result_typologies"));
259
        result.setMetadata(metadata.build());
260
        entity.setResult(result.build());
261
        oaf.setEntity(entity.build());
262
        final List<AtomicAction> actionList = new ArrayList<>();
263
        actionList.add(factory.createAtomicAction(setName, agent, oaf.getEntity().getId(), "result", "body", oaf.build().toByteArray()));
264

    
265
        if (!authorsOrganizations.getValue().isEmpty()) {
266

    
267
            authorsOrganizations.getValue().forEach(o ->
268
                    {
269
                        actionList.add(factory.createAtomicAction(setName, agent, o.getEntity().getId(), "organization", "body", o.toByteArray()));
270
                        actionList.addAll(createPublicationOrganizationRelation(oaf.build(), o, factory, setName, agent));
271
                        final String gridOrganization = getSimilarGridOrganization(o.getEntity());
272
                        if (gridOrganization!= null) {
273
                            actionList.add(factory.createAtomicAction(setName, agent, o.getEntity().getId(), "organizationOrganization_dedupSimilarity_isSimilarTo", gridOrganization, "".getBytes()));
274
                            actionList.add(factory.createAtomicAction(setName, agent, gridOrganization, "organizationOrganization_dedupSimilarity_isSimilarTo", o.getEntity().getId(), "".getBytes()));
275
                        }
276
                    });
277
        }
278

    
279
        return actionList;
280

    
281
    }
282

    
283

    
284
    private static String getSimilarGridOrganization(final OafProtos.OafEntity organization) {
285

    
286
        final List<FieldTypeProtos.StructuredProperty> pidList = organization.getPidList();
287
        if (pidList!= null ) {
288
            for (FieldTypeProtos.StructuredProperty p: pidList) {
289
                if (p.getQualifier().getClassname().equals("grid")){
290
                    return "20|grid________::"+AbstractDNetXsltFunctions.md5(p.getValue());
291
                }
292
            }
293
        }
294
        return null;
295

    
296
    }
297

    
298
    private static List<AtomicAction> createPublicationOrganizationRelation(final OafProtos.Oaf publication, final OafProtos.Oaf organization, final ActionFactory factory, final String setName, final Agent agent) {
299

    
300
        List<AtomicAction> result = new ArrayList<>();
301

    
302
        final OafProtos.Oaf.Builder roaf = OafProtos.Oaf.newBuilder();
303
        roaf.setKind(KindProtos.Kind.relation);
304

    
305
        roaf.setDataInfo(FieldTypeProtos.DataInfo.newBuilder()
306
                .setInvisible(false)
307
                .setDeletedbyinference(false)
308
                .setInferred(false)
309
                .setTrust("0.9")
310
                .setProvenanceaction(getQualifier("sysimport:actionset", "dnet:provenanceActions"))
311
                .build());
312

    
313

    
314
        final OafProtos.OafRel.Builder rel = OafProtos.OafRel.newBuilder();
315

    
316
        rel.setRelType(RelTypeProtos.RelType.resultOrganization);
317
        rel.setSubRelType(RelTypeProtos.SubRelType.affiliation);
318

    
319
        //Create a relation Result --> Organization
320
        rel.setSource(publication.getEntity().getId());
321
        rel.setTarget(organization.getEntity().getId());
322
        rel.setRelClass(ResultOrganization.Affiliation.RelName.hasAuthorInstitution.toString());
323

    
324
        final ResultOrganization.Builder rel_instance = ResultOrganization.newBuilder();
325

    
326
        final ResultOrganization.Affiliation.Builder affiliationRel = ResultOrganization.Affiliation.newBuilder();
327
        affiliationRel.setRelMetadata(RelMetadataProtos.RelMetadata.newBuilder()
328
                .setSemantics(getQualifier("hasAuthorInstitution", "dnet:result_organization_relations"))
329
                .build());
330
        rel_instance.setAffiliation(affiliationRel.build());
331
        rel.setResultOrganization(rel_instance.build());
332

    
333
        rel.addCollectedfrom(FieldTypeProtos.KeyValue.newBuilder()
334
                .setValue(datasources.get("MAG").getKey())
335
                .setKey("10|openaire____::" + AbstractDNetXsltFunctions.md5(StringUtils.substringAfter(datasources.get("MAG").getValue(), "::")))
336
                .build());
337

    
338

    
339

    
340
        rel.setChild(false);
341
        roaf.setRel(rel.build());
342

    
343
        result.add(factory.createAtomicAction(setName, agent, publication.getEntity().getId(), "resultOrganization_affiliation_hasAuthorInstitution", organization.getEntity().getId(), roaf.build().toByteArray() ));
344

    
345

    
346
        //Create a relation Organization --> Result
347
        rel.setTarget(publication.getEntity().getId());
348
        rel.setSource(organization.getEntity().getId());
349
        rel.setRelClass(ResultOrganization.Affiliation.RelName.isAuthorInstitutionOf.toString());
350

    
351

    
352
        affiliationRel.setRelMetadata(RelMetadataProtos.RelMetadata.newBuilder()
353
                .setSemantics(getQualifier("isAuthorInstitutionOf", "dnet:result_organization_relations"))
354
                .build());
355
        rel_instance.setAffiliation(affiliationRel.build());
356
        rel.setResultOrganization(rel_instance.build());
357
        roaf.setRel(rel.build());
358
        result.add(factory.createAtomicAction(setName, agent, organization.getEntity().getId(), "resultOrganization_affiliation_isAuthorInstitutionOf", publication.getEntity().getId(), roaf.build().toByteArray()));
359

    
360
        return result;
361

    
362
    }
363

    
364
    private static boolean hasJSONArrayField(final JsonObject root, final String key) {
365
        return root.has(key) && root.get(key).isJsonArray();
366
    }
367

    
368
    private static void settingRelevantDate(JsonObject rootElement, ResultProtos.Result.Metadata.Builder metadata , final String jsonKey, final String dictionaryKey, final boolean addToDateOfAcceptance) {
369
        //Adding date
370
        String date = getStringValue(rootElement,jsonKey);
371
        if (date == null)
372
            return;
373
        if (date.length() == 4) {
374
            date += "-01-01";
375
        }
376
        if (isValidDate(date)) {
377
            if (addToDateOfAcceptance)
378
                metadata.setDateofacceptance(FieldTypeProtos.StringField.newBuilder().setValue(date).build());
379
            metadata.addRelevantdate(FieldTypeProtos.StructuredProperty.newBuilder()
380
                    .setValue(date)
381
                    .setQualifier(getQualifier(dictionaryKey,"dnet:dataCite_date"))
382
                    .build());
383
        }
384
    }
385

    
386

    
387
    public static FieldTypeProtos.KeyValue extractIdentifier(final String value) {
388
        FieldTypeProtos.KeyValue.Builder pid = FieldTypeProtos.KeyValue.newBuilder();
389
        if (StringUtils.contains(value, "orcid.org")){
390
            return pid.setValue(value)
391
                    .setKey("ORCID").build();
392
        }
393
        if (StringUtils.contains(value, "academic.microsoft.com/#/detail")){
394
            return pid.setValue(value)
395
                    .setKey("MAG Identifier").build();
396
        }
397
        return pid.setValue(value)
398
                .setKey("URL").build();
399
    }
400

    
401

    
402
    public static OafProtos.Oaf createOrganizationFromJSON(final JsonObject affiliation) {
403
        final Map<String, FieldTypeProtos.Qualifier> affiliationIdentifiers = new HashMap<>();
404
        final List<String> magId = new ArrayList<>();
405
        getArrayObjects(affiliation, "identifiers").forEach(it -> {
406
            if (StringUtils.contains(it.get("value").getAsString(), "academic.microsoft.com")) {
407
                affiliationIdentifiers.put(it.get("value").getAsString(), affiliationPIDType.get("MAG"));
408
                magId.add(it.get("value").getAsString());
409
            }
410
            else
411
                affiliationIdentifiers.put( it.get("value").getAsString(), affiliationPIDType.get(it.get("schema").getAsString()));
412
        });
413
        if (magId.size() > 0) {
414
            final String microsoftID = magId.get(0);
415
            OafProtos.Oaf.Builder oaf = OafProtos.Oaf.newBuilder();
416
            oaf.setKind(KindProtos.Kind.entity);
417
            OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder();
418
            entity.setType(TypeProtos.Type.organization);
419
            entity.setId("20|microsoft___::"+AbstractDNetXsltFunctions.md5(microsoftID));
420
            final String id =datasources.get(affiliation.get("provenance").getAsString()).getValue();
421
            final String name =datasources.get(affiliation.get("provenance").getAsString()).getKey();
422
            if (StringUtils.isNotBlank(id) && StringUtils.isNotBlank(name)) {
423
                final FieldTypeProtos.KeyValue collectedFrom = FieldTypeProtos.KeyValue.newBuilder()
424
                        .setValue(name)
425
                        .setKey("10|openaire____::" + AbstractDNetXsltFunctions.md5(StringUtils.substringAfter(id, "::")))
426
                        .build();
427
                entity.addCollectedfrom(collectedFrom);
428
            } else {
429
                return null;
430
            }
431
            entity.addOriginalId(microsoftID);
432

    
433
            affiliationIdentifiers.forEach((key, value) -> entity.addPid(
434
                    FieldTypeProtos.StructuredProperty.newBuilder()
435
                            .setQualifier(value)
436
                            .setValue(key)
437
                            .build()));
438

    
439
            final OrganizationProtos.Organization.Builder organization = OrganizationProtos.Organization.newBuilder();
440
            organization.setMetadata(OrganizationProtos.Organization.Metadata.newBuilder()
441
                    .setWebsiteurl(FieldTypeProtos.StringField.newBuilder().setValue(affiliation.get("official-page").getAsString()).build())
442
                    .setLegalname(FieldTypeProtos.StringField.newBuilder().setValue(affiliation.get("value").getAsString()).build())
443
                    .build());
444

    
445
            entity.setOrganization(organization);
446
            oaf.setEntity(entity);
447
            oaf.setDataInfo(FieldTypeProtos.DataInfo.newBuilder()
448
                    .setInvisible(false)
449
                    .setDeletedbyinference(false)
450
                    .setInferred(false)
451
                    .setTrust("0.9")
452
                    .setProvenanceaction(getQualifier("sysimport:actionset", "dnet:provenanceActions"))
453
                    .build());
454
            return oaf.build();
455
        }
456
        return  null;
457
    }
458

    
459
    public static Pair<List<FieldTypeProtos.Author>, Collection<OafProtos.Oaf>>  createAuthorsOrganization(final JsonObject root) {
460

    
461
        final Map<String, OafProtos.Oaf> affiliations = new HashMap<>();
462

    
463
        List<JsonObject> authors = getArrayObjects(root, "authors");
464

    
465
        final AtomicInteger counter = new AtomicInteger();
466

    
467
        List<FieldTypeProtos.Author> collect = authors.stream().map(author -> {
468
            final String given = getStringValue(author, "given");
469
            final String family = getStringValue(author, "family");
470
            String fullname = getStringValue(author, "fullname");
471

    
472
            if (StringUtils.isBlank(fullname) && StringUtils.isNotBlank(given) && StringUtils.isNotBlank(family)) {
473
                fullname = String.format("%s %s", given, family);
474
            }
475
            final FieldTypeProtos.Author.Builder abuilder = FieldTypeProtos.Author.newBuilder();
476

    
477
            if (StringUtils.isNotBlank(given))
478
                abuilder.setName(given);
479
            if (StringUtils.isNotBlank(family))
480
            abuilder.setSurname(family);
481
            if (StringUtils.isNotBlank(fullname))
482
                abuilder.setFullname(fullname);
483

    
484
            final List<JsonObject> identifiers = getArrayObjects(author, "identifiers");
485
            final List<JsonObject> authorAffiliation = getArrayObjects(author, "affiliations");
486

    
487
            authorAffiliation.forEach(it ->
488
            {
489
                OafProtos.Oaf org = createOrganizationFromJSON(it);
490
                if (org != null) {
491
                    affiliations.put(org.getEntity().getId(), org);
492
                    abuilder.addAffiliation(org.getEntity().getOrganization().getMetadata().getLegalname());
493
                }
494
            });
495
            identifiers.stream().map(id -> {
496
                final String value = id.get("value").getAsString();
497
                return extractIdentifier(value);
498
            }).forEach(abuilder::addPid);
499
            abuilder.setRank(counter.getAndIncrement());
500

    
501
            return abuilder.build();
502

    
503
        }).collect(Collectors.toList());
504

    
505
        return new Pair<> ( collect,affiliations.values() );
506
    }
507

    
508

    
509

    
510

    
511

    
512

    
513
}
(3-3/14)