Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import eu.dnetlib.data.bulktag.CommunityConfiguration;
4
import eu.dnetlib.data.bulktag.Pair;
5
import eu.dnetlib.data.proto.FieldTypeProtos;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.ResultProtos;
8
import org.apache.hadoop.mapreduce.Mapper;
9

    
10
import java.util.*;
11
import java.util.stream.Collectors;
12

    
13
/**
14
 * Created by miriam on 02/08/2018.
15
 */
16
public class ResultTagger {
17
    private final static String DATA_INFO_TYPE = "bulktagging::community";
18
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
19
    private final static String CLASS_ID = "bulktagging::community";
20
    private final static String SCHEMA_ID = "dnet:provenanceActions";
21
    private final static String COUNTER_GROUP = "Bulk Tagging";
22

    
23

    
24
    public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) {
25

    
26
        context.getCounter(COUNTER_GROUP, "to enrich").increment(1);
27
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
28

    
29

    
30
        if(oaf.getDataInfo().getDeletedbyinference()){
31
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
32
            return null;
33
        }
34
        context.getCounter(COUNTER_GROUP, "not deleted by inference").increment(1);
35

    
36
        final List<ResultProtos.Result.Context> contextList = oaf.getEntity().getResult().getMetadata().getContextList();
37

    
38
        //communities contains all the communities to be added as context for the result
39
        final Set<String> communities = new HashSet<>();
40
        List<FieldTypeProtos.StructuredProperty> subjectSet = oaf.getEntity().getResult().getMetadata().getSubjectList();
41
        communities.addAll(analiseSubjects(subjectSet, conf));
42

    
43
        List<Pair<String, String>> datasourceSet = oaf.getEntity().getResult().getInstanceList()
44
                .stream()
45
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
46
                .collect(Collectors.toList());
47

    
48
        communities.addAll(analiseDatasource(datasourceSet, conf));
49

    
50
        //TODO: add code for Zenodo Communities
51

    
52

    
53
        final Map<String,Pair<Integer,ResultProtos.Result.Context>> contexts = new HashMap<>();
54

    
55
        for(int i = 0; i<contextList.size(); i++){
56
            contexts.put(contextList.get(i).getId(),new Pair<>(i,contextList.get(i)));
57
        }
58

    
59
        for(String contextId:communities){
60
            final Pair<Integer,ResultProtos.Result.Context> pair = contexts.get(contextId);
61
            ResultProtos.Result.Context c;
62
            if(pair == null)
63
                c = null;
64
             else
65
                 c = pair.getSnd();
66
            if (c != null) {
67
                // add new dataInfo if needed
68

    
69
                    Set<String> set = new HashSet<>();
70
                    set.addAll(c.getDataInfoList().stream().map(datainfo->datainfo.getInferenceprovenance()).collect(Collectors.toList()));
71
                    if (!set.contains(DATA_INFO_TYPE)) {
72
                        builder.getEntityBuilder().getResultBuilder().getMetadataBuilder().getContextBuilder(pair.getFst()).addDataInfo(buildDataInfo());
73

    
74
                       context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
75
                    }
76

    
77

    
78
            } else {
79
                builder.getEntityBuilder().getResultBuilder().getMetadataBuilder().addContext(buildContext(contextId));
80

    
81
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
82

    
83
            }
84
        }
85

    
86
        return builder.build();
87
    }
88

    
89
    private Set<String> analiseDatasource(List<Pair<String, String>> datasourceSet, final CommunityConfiguration conf) {
90
        Set<String> set = new HashSet<>();
91
        for(Pair<String,String> d: datasourceSet){
92
            if(!d.getFst().equalsIgnoreCase(d.getSnd())){
93
                set.addAll(conf.getCommunityForDatasourceValue(d.getFst().substring(d.getFst().indexOf("|")+1)));
94
            }
95
            set.addAll((conf.getCommunityForDatasourceValue(d.getSnd().substring(d.getSnd().indexOf("|")+1))));
96
        }
97
        return set;
98
    }
99

    
100
    private Set<String> analiseSubjects(List<FieldTypeProtos.StructuredProperty> subjectList, final CommunityConfiguration conf) {
101
        Set<String> set = new HashSet<>();
102
        for(FieldTypeProtos.StructuredProperty s:subjectList){
103
            set.addAll(conf.getCommunityForSubjectValue( s.getValue()));
104
        }
105

    
106
        return set;
107
    }
108

    
109
    private ResultProtos.Result.Context buildContext(final String contextId) {
110
        return ResultProtos.Result.Context.newBuilder()
111
                .setId(contextId)
112
                .addDataInfo(buildDataInfo())
113
                .build();
114
    }
115

    
116
    private FieldTypeProtos.DataInfo buildDataInfo() {
117
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
118
                .setInferred(true)
119
                .setProvenanceaction(
120
                        FieldTypeProtos.Qualifier.newBuilder()
121
                                .setClassid(CLASS_ID)
122
                                .setClassname("Bulk Tagging for Communities")
123
                                .setSchemeid(SCHEMA_ID)
124
                                .setSchemename(SCHEMA_NAME))
125
                .setInferenceprovenance(DATA_INFO_TYPE)
126
                .setTrust("0.85");
127
        return builder
128
                .build();
129
    }
130

    
131

    
132
}
(3-3/3)