Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import eu.dnetlib.data.bulktag.CommunityConfiguration;
4
import eu.dnetlib.data.bulktag.Pair;
5
import eu.dnetlib.data.proto.FieldTypeProtos;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.ResultProtos;
8
import org.apache.hadoop.mapreduce.Mapper;
9

    
10
import java.util.*;
11
import java.util.stream.Collectors;
12

    
13
/**
14
 * Created by miriam on 02/08/2018.
15
 */
16
public class ResultTagger {
17
    private final static String DATA_INFO_TYPE = "bulktagging::community";
18
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
19
    private final static String CLASS_ID = "bulktagging::community";
20
    private final static String SCHEMA_ID = "dnet:provenanceActions";
21
    private final static String COUNTER_GROUP = "Bulk Tagging";
22

    
23

    
24
    public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) {
25

    
26
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
27

    
28

    
29
        if(oaf.getDataInfo().getDeletedbyinference()){
30
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
31
            return null;
32
        }
33

    
34

    
35
        final List<ResultProtos.Result.Context> contextList = oaf.getEntity().getResult().getMetadata().getContextList();
36

    
37
        //communities contains all the communities to be added as context for the result
38
        final Set<String> communities = new HashSet<>();
39
        List<FieldTypeProtos.StructuredProperty> subjectSet = oaf.getEntity().getResult().getMetadata().getSubjectList();
40
        communities.addAll(analiseSubjects(subjectSet, conf));
41

    
42
        List<Pair<String, String>> datasourceSet = oaf.getEntity().getResult().getInstanceList()
43
                .stream()
44
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
45
                .collect(Collectors.toList());
46

    
47
        communities.addAll(analiseDatasource(datasourceSet, conf));
48

    
49
        //TODO: add code for Zenodo Communities
50

    
51

    
52
        final Map<String,Pair<Integer,ResultProtos.Result.Context>> contexts = new HashMap<>();
53

    
54
        for(int i = 0; i<contextList.size(); i++){
55
            contexts.put(contextList.get(i).getId(),new Pair<>(i,contextList.get(i)));
56
        }
57

    
58
        for(String contextId:communities){
59
            final Pair<Integer,ResultProtos.Result.Context> pair = contexts.get(contextId);
60
            ResultProtos.Result.Context c;
61
            if(pair == null)
62
                c = null;
63
             else
64
                 c = pair.getSnd();
65
            if (c != null) {
66
                // add new dataInfo if needed
67

    
68
                    Set<String> set = new HashSet<>();
69
                    set.addAll(c.getDataInfoList().stream().map(datainfo->datainfo.getInferenceprovenance()).collect(Collectors.toList()));
70
                    if (!set.contains(DATA_INFO_TYPE)) {
71
                        builder.getEntityBuilder().getResultBuilder().getMetadataBuilder().getContextBuilder(pair.getFst()).addDataInfo(buildDataInfo());
72

    
73
                       context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
74
                    }
75

    
76

    
77
            } else {
78
                builder.getEntityBuilder().getResultBuilder().getMetadataBuilder().addContext(buildContext(contextId));
79

    
80
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
81

    
82
            }
83
        }
84

    
85
        return builder.build();
86
    }
87

    
88
    private Set<String> analiseDatasource(List<Pair<String, String>> datasourceSet, final CommunityConfiguration conf) {
89
        Set<String> set = new HashSet<>();
90
        for(Pair<String,String> d: datasourceSet){
91
            if(!d.getFst().equalsIgnoreCase(d.getSnd())){
92
                set.addAll(conf.getCommunityForDatasourceValue(d.getFst().substring(d.getFst().indexOf("|")+1)));
93
            }
94
            set.addAll((conf.getCommunityForDatasourceValue(d.getSnd().substring(d.getSnd().indexOf("|")+1))));
95
        }
96
        return set;
97
    }
98

    
99
    private Set<String> analiseSubjects(List<FieldTypeProtos.StructuredProperty> subjectList, final CommunityConfiguration conf) {
100
        Set<String> set = new HashSet<>();
101
        for(FieldTypeProtos.StructuredProperty s:subjectList){
102
            set.addAll(conf.getCommunityForSubjectValue( s.getValue()));
103
        }
104

    
105
        return set;
106
    }
107

    
108
    private ResultProtos.Result.Context buildContext(final String contextId) {
109
        return ResultProtos.Result.Context.newBuilder()
110
                .setId(contextId)
111
                .addDataInfo(buildDataInfo())
112
                .build();
113
    }
114

    
115
    private FieldTypeProtos.DataInfo buildDataInfo() {
116
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
117
                .setInferred(true)
118
                .setProvenanceaction(
119
                        FieldTypeProtos.Qualifier.newBuilder()
120
                                .setClassid(CLASS_ID)
121
                                .setClassname("Bulk Tagging for Communities")
122
                                .setSchemeid(SCHEMA_ID)
123
                                .setSchemename(SCHEMA_NAME))
124
                .setInferenceprovenance(DATA_INFO_TYPE)
125
                .setTrust("0.85");
126
        return builder
127
                .build();
128
    }
129

    
130

    
131
}
(3-3/3)