Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import com.google.common.base.Functions;
4
import com.google.common.collect.Maps;
5
import com.google.common.collect.Sets;
6
import eu.dnetlib.data.bulktag.CommunityConfiguration;
7
import eu.dnetlib.data.bulktag.Pair;
8
import eu.dnetlib.data.proto.FieldTypeProtos;
9
import eu.dnetlib.data.proto.OafProtos;
10
import eu.dnetlib.data.proto.ResultProtos;
11
import eu.dnetlib.data.proto.ResultProtos.Result.Context;
12
import org.apache.commons.lang3.StringUtils;
13
import org.apache.hadoop.mapreduce.Mapper;
14

    
15
import java.util.*;
16
import java.util.stream.Collectors;
17
import java.util.stream.Stream;
18

    
19
/**
20
 * Created by miriam on 02/08/2018.
21
 */
22
public class ResultTagger {
23
    private final static String DATA_INFO_TYPE = "bulktagging";
24
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
25
    private final static String CLASS_ID = "bulktagging::community";
26
    private final static String SCHEMA_ID = "dnet:provenanceActions";
27
    private final static String COUNTER_GROUP = "Bulk Tagging";
28

    
29
    private final static String ZENODO_COMMUNITY_INDICATOR = "https://zenodo.org/communities/";
30

    
31
    private String trust = "0.8";
32

    
33

    
34
    public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) {
35

    
36
        //context.getCounter(COUNTER_GROUP, "to enrich").increment(1);
37
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
38

    
39

    
40
        if(oaf.getDataInfo().getDeletedbyinference()){
41
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
42
            return null;
43
        }
44
        //context.getCounter(COUNTER_GROUP, "not deleted by inference").increment(1);
45

    
46
        final List<ResultProtos.Result.Context> contextList = oaf.getEntity().getResult().getMetadata().getContextList();
47

    
48
        if(contextList.size()>0){
49
            context.getCounter(COUNTER_GROUP, "exist context list").increment(1);
50
        }else{
51
            context.getCounter(COUNTER_GROUP, "not exist context list").increment(1);
52
        }
53
        //communities contains all the communities to be added as context for the result
54
        final Set<String> communities = new HashSet<>();
55
        final Set<String> subjects = new HashSet<>();
56
        oaf.getEntity().getResult().getMetadata().getSubjectList().stream()
57
                .map(subject -> subject.getValue())
58
                .filter(StringUtils::isNotBlank)
59
                .map(String::toLowerCase)
60
                .map(String::trim)
61
                .collect(Collectors.toCollection(HashSet::new))
62
                .forEach(s -> subjects.addAll(conf.getCommunityForSubjectValue(s)));
63

    
64
        //updating counters for subject matching
65
        subjects.forEach(c->context.getCounter(COUNTER_GROUP, "Matching subject for community " + c).increment(1));
66
        communities.addAll(subjects);
67
        context.getCounter(COUNTER_GROUP,"match found for subjects ").increment(subjects.size());
68
        final Set<String> datasources = new HashSet<>();
69
        oaf.getEntity().getResult().getInstanceList()
70
                .stream()
71
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
72
                .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
73
                .map(s -> StringUtils.substringAfter(s, "|"))
74
                .collect(Collectors.toCollection(HashSet::new))
75
                .forEach(dsId -> datasources.addAll(conf.getCommunityForDatasourceValue(dsId)));
76

    
77
        datasources.forEach(c->context.getCounter(COUNTER_GROUP,"Matching datasource for community " + c).increment(1));
78
        communities.addAll(datasources);
79
        context.getCounter(COUNTER_GROUP,"Match found for content providers " ).increment(datasources.size());
80

    
81
        final Set<String> czenodo = new HashSet<>();
82
        final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
83
        mBuilder.getContextBuilderList().stream().filter(cBuilder -> cBuilder.getId().contains(ZENODO_COMMUNITY_INDICATOR))
84
                .collect(Collectors.toList())
85
                .forEach(c->czenodo.addAll(conf.getCommunityForZenodoCommunityValue(c.getId().substring(c.getId().lastIndexOf("/")+1).trim())));
86
        czenodo.forEach(c->context.getCounter(COUNTER_GROUP,"Matching Zenodo community for community " + c).increment(1));
87
        context.getCounter(COUNTER_GROUP,"Match found for Zenodo communities " ).increment(czenodo.size());
88

    
89
        communities.addAll(czenodo);
90
        List<Context.Builder> clist = mBuilder.getContextBuilderList().stream()
91
                .filter(c -> (!c.getId().contains(ZENODO_COMMUNITY_INDICATOR))).collect(Collectors.toList());
92

    
93
        mBuilder.clearContext();
94
        clist.forEach(c->mBuilder.addContext(c));
95

    
96

    
97
        final Map<String, ResultProtos.Result.Context.Builder> cBuilders = Maps.newHashMap();
98

    
99

    
100
        if(communities.isEmpty()){
101
            context.getCounter(COUNTER_GROUP, "list of communities empty").increment(1);
102
        }else{
103
            context.getCounter(COUNTER_GROUP, "list of communities has values!").increment(1);
104
        }
105

    
106
        mBuilder.getContextBuilderList().forEach(cBuilder -> {
107
            cBuilders.put(cBuilder.getId(), cBuilder);
108
        });
109

    
110
        for(String contextId:communities){
111

    
112
            final ResultProtos.Result.Context.Builder cBuilder = cBuilders.get(contextId);
113
            if (cBuilder != null) {
114

    
115
                if (!cBuilder.getDataInfoBuilderList().stream()
116
                        .map(di -> di.getInferenceprovenance())
117
                        .anyMatch(s -> DATA_INFO_TYPE.equals(s))) {
118

    
119
                    cBuilder.addDataInfo(buildDataInfo());
120
                    context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
121
                } else {
122
                    context.getCounter(COUNTER_GROUP, "provenance already bulk tagged").increment(1);
123
                }
124
            } else {
125
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
126
                mBuilder.addContext(buildContext(contextId));
127
            }
128

    
129
        }
130

    
131
        return builder.build();
132
    }
133

    
134
    private ResultProtos.Result.Context buildContext(final String contextId) {
135
        return ResultProtos.Result.Context.newBuilder()
136
                .setId(contextId)
137
                .addDataInfo(buildDataInfo())
138
                .build();
139
    }
140

    
141
    private FieldTypeProtos.DataInfo buildDataInfo() {
142
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
143
                .setInferred(true)
144
                .setProvenanceaction(
145
                        FieldTypeProtos.Qualifier.newBuilder()
146
                                .setClassid(CLASS_ID)
147
                                .setClassname("Bulk Tagging for Communities")
148
                                .setSchemeid(SCHEMA_ID)
149
                                .setSchemename(SCHEMA_NAME))
150
                .setInferenceprovenance(DATA_INFO_TYPE)
151
                .setTrust(trust);
152
        return builder
153
                .build();
154
    }
155

    
156

    
157
    public void setTrust(String s) {
158
        trust = s;
159
    }
160
}
(2-2/2)