Project

General

Profile

1 52919 miriam.bag
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2
3 52952 miriam.bag
import com.google.common.base.Functions;
4
import com.google.common.collect.Maps;
5
import com.google.common.collect.Sets;
6 52919 miriam.bag
import eu.dnetlib.data.bulktag.CommunityConfiguration;
7
import eu.dnetlib.data.bulktag.Pair;
8
import eu.dnetlib.data.proto.FieldTypeProtos;
9
import eu.dnetlib.data.proto.OafProtos;
10
import eu.dnetlib.data.proto.ResultProtos;
11 52952 miriam.bag
import org.apache.commons.lang3.StringUtils;
12 52919 miriam.bag
import org.apache.hadoop.mapreduce.Mapper;
13
14
import java.util.*;
15
import java.util.stream.Collectors;
16 52952 miriam.bag
import java.util.stream.Stream;
17 52919 miriam.bag
18
/**
19
 * Created by miriam on 02/08/2018.
20
 */
21
public class ResultTagger {
22 53279 miriam.bag
    private final static String DATA_INFO_TYPE = "bulktagging";
23 52919 miriam.bag
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
24
    private final static String CLASS_ID = "bulktagging::community";
25
    private final static String SCHEMA_ID = "dnet:provenanceActions";
26
    private final static String COUNTER_GROUP = "Bulk Tagging";
27
28 53279 miriam.bag
    private String trust = "0.8";
29 52919 miriam.bag
30 53279 miriam.bag
31 52919 miriam.bag
    public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) {
32
33 52952 miriam.bag
        //context.getCounter(COUNTER_GROUP, "to enrich").increment(1);
34 52919 miriam.bag
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
35
36 52942 miriam.bag
37 52943 miriam.bag
        if(oaf.getDataInfo().getDeletedbyinference()){
38
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
39 52942 miriam.bag
            return null;
40 52943 miriam.bag
        }
41 52952 miriam.bag
        //context.getCounter(COUNTER_GROUP, "not deleted by inference").increment(1);
42 52942 miriam.bag
43
        final List<ResultProtos.Result.Context> contextList = oaf.getEntity().getResult().getMetadata().getContextList();
44
45 52952 miriam.bag
        if(contextList.size()>0){
46
            context.getCounter(COUNTER_GROUP, "exist context list").increment(1);
47
        }else{
48
            context.getCounter(COUNTER_GROUP, "not exist context list").increment(1);
49
        }
50 52942 miriam.bag
        //communities contains all the communities to be added as context for the result
51
        final Set<String> communities = new HashSet<>();
52
53 52952 miriam.bag
        oaf.getEntity().getResult().getMetadata().getSubjectList().stream()
54
                .map(subject -> subject.getValue())
55
                .filter(StringUtils::isNotBlank)
56
                .map(String::toLowerCase)
57
                .map(String::trim)
58
                .collect(Collectors.toCollection(HashSet::new))
59
                .forEach(s -> communities.addAll(conf.getCommunityForSubjectValue(s)));
60
61
        oaf.getEntity().getResult().getInstanceList()
62 52942 miriam.bag
                .stream()
63
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
64 52952 miriam.bag
                .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
65
                .map(s -> StringUtils.substringAfter(s, "|"))
66
                .collect(Collectors.toCollection(HashSet::new))
67
                .forEach(dsId -> communities.addAll(conf.getCommunityForDatasourceValue(dsId)));
68 52942 miriam.bag
69
        //TODO: add code for Zenodo Communities
70
71 52952 miriam.bag
        if(communities.isEmpty()){
72
            context.getCounter(COUNTER_GROUP, "list of communities empty").increment(1);
73
        }else{
74
            context.getCounter(COUNTER_GROUP, "list of communities has values!").increment(1);
75
        }
76 52942 miriam.bag
77 52952 miriam.bag
        final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
78 52942 miriam.bag
79 52952 miriam.bag
        final Map<String, ResultProtos.Result.Context.Builder> cBuilders = Maps.newHashMap();
80
        mBuilder.getContextBuilderList().forEach(cBuilder -> {
81
            cBuilders.put(cBuilder.getId(), cBuilder);
82
        });
83 52942 miriam.bag
84
        for(String contextId:communities){
85
86 52952 miriam.bag
            final ResultProtos.Result.Context.Builder cBuilder = cBuilders.get(contextId);
87
            if (cBuilder != null) {
88 52942 miriam.bag
89 52952 miriam.bag
                if (!cBuilder.getDataInfoBuilderList().stream()
90
                        .map(di -> di.getInferenceprovenance())
91
                        .anyMatch(s -> DATA_INFO_TYPE.equals(s))) {
92 52942 miriam.bag
93 52952 miriam.bag
                    cBuilder.addDataInfo(buildDataInfo());
94
                    context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
95
                } else {
96
                    context.getCounter(COUNTER_GROUP, "provenance already bulk tagged").increment(1);
97
                }
98 52942 miriam.bag
            } else {
99
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
100 52952 miriam.bag
                mBuilder.addContext(buildContext(contextId));
101
            }
102 52942 miriam.bag
103
        }
104
105
        return builder.build();
106 52919 miriam.bag
    }
107
108
    private ResultProtos.Result.Context buildContext(final String contextId) {
109
        return ResultProtos.Result.Context.newBuilder()
110
                .setId(contextId)
111
                .addDataInfo(buildDataInfo())
112
                .build();
113
    }
114
115
    private FieldTypeProtos.DataInfo buildDataInfo() {
116
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
117
                .setInferred(true)
118
                .setProvenanceaction(
119
                        FieldTypeProtos.Qualifier.newBuilder()
120
                                .setClassid(CLASS_ID)
121
                                .setClassname("Bulk Tagging for Communities")
122
                                .setSchemeid(SCHEMA_ID)
123
                                .setSchemename(SCHEMA_NAME))
124
                .setInferenceprovenance(DATA_INFO_TYPE)
125 53279 miriam.bag
                .setTrust(trust);
126 52919 miriam.bag
        return builder
127
                .build();
128
    }
129
130
131 53279 miriam.bag
    public void setTrust(String s) {
132
        trust = s;
133
    }
134
}