Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import com.google.common.collect.Maps;
4
import com.google.protobuf.InvalidProtocolBufferException;
5
import com.googlecode.protobuf.format.JsonFormat;
6
import com.jayway.jsonpath.DocumentContext;
7
import com.jayway.jsonpath.JsonPath;
8
import eu.dnetlib.data.bulktag.CommunityConfiguration;
9
import eu.dnetlib.data.bulktag.Pair;
10
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
11
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
12
import eu.dnetlib.data.proto.FieldTypeProtos;
13
import eu.dnetlib.data.proto.OafProtos;
14
import eu.dnetlib.data.proto.ResultProtos;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Context;
16
import org.apache.commons.lang3.StringUtils;
17
import org.apache.hadoop.hbase.client.Put;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.mapreduce.Mapper;
21

    
22
import javax.rmi.CORBA.Util;
23
import java.util.*;
24
import java.util.stream.Collectors;
25
import java.util.stream.Stream;
26

    
27
import static eu.dnetlib.data.mapreduce.hbase.bulktag.TagginConstants.*;
28
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
29

    
30
/**
31
 * Created by miriam on 02/08/2018.
32
 */
33
public class ResultTagger {
34

    
35

    
36
    private String trust = "0.8";
37

    
38

    
39
    private boolean clearContext(ResultProtos.Result.Metadata.Builder mBuilder){
40
        int tmp = mBuilder.getContextBuilderList().size();
41
        List<Context.Builder> clist = mBuilder.getContextBuilderList().stream()
42
                .filter(c -> (!c.getId().contains(ZENODO_COMMUNITY_INDICATOR))).collect(Collectors.toList());
43
        mBuilder.clearContext();
44
        clist.forEach(c->mBuilder.addContext(c));
45
        return (tmp != clist.size());
46
    }
47
    public Map<String,List<String>> getParamMap(final OafProtos.Oaf oaf, Map<String,String> params) {
48
        Map<String,List<String>> param = new HashMap<>();
49
        String json = JsonFormat.printToString(oaf);
50
        DocumentContext jsonContext = JsonPath.parse(json);
51
        if (params == null){
52
            params = new HashMap<>();
53
        }
54
        for(String key : params.keySet()) {
55
            try {
56
                param.put(key, jsonContext.read(params.get(key)));
57
            } catch (com.jayway.jsonpath.PathNotFoundException e) {
58
                param.put(key, new ArrayList<>());
59
               // throw e;
60
            }
61
        }
62
        return param;
63

    
64
    }
65

    
66

    
67
    public OafProtos.Oaf enrichContextCriteria(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context, final Map<String,String> criteria) {
68
        final Map<String, List<String>> param = getParamMap(oaf, criteria);
69

    
70
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
71
        final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
72

    
73

    
74
        //Verify if the entity is deletedbyinference. In case verify if to clean the context list from all the zenodo communities
75
        if(oaf.getDataInfo().getDeletedbyinference()){
76
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
77
            if(!clearContext(mBuilder))
78
                return null;
79
            context.getCounter(COUNTER_GROUP,"removed zenodo community context in del by inference record").increment(1);
80
            return builder.build();
81
        }
82

    
83
        //communities contains all the communities to be added as context for the result
84
        final Set<String> communities = new HashSet<>();
85

    
86

    
87
        //tagging for Subject
88
        final Set<String> subjects = new HashSet<>();
89
        oaf.getEntity().getResult().getMetadata().getSubjectList().stream()
90
                .map(subject -> subject.getValue())
91
                .filter(StringUtils::isNotBlank)
92
                .map(String::toLowerCase)
93
                .map(String::trim)
94
                .collect(Collectors.toCollection(HashSet::new))
95
                .forEach(s -> subjects.addAll(conf.getCommunityForSubjectValue(s)));
96

    
97
        //updating counters for subject matching
98
        subjects.forEach(c->context.getCounter(COUNTER_GROUP, "Matching subject for community " + c).increment(1));
99
        communities.addAll(subjects);
100
        context.getCounter(COUNTER_GROUP,"match found for subjects ").increment(subjects.size());
101

    
102
        //Tagging for datasource
103
        final Set<String> datasources = new HashSet<>();
104
        final Set<String> tmp = new HashSet<>();
105
        for(ResultProtos.Result.Instance i : oaf.getEntity().getResult().getInstanceList()){
106
            tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(),"|"));
107
            tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(),"|"));
108
        }
109

    
110
        oaf.getEntity().getResult().getInstanceList()
111
                .stream()
112
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
113
                .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
114
                .map(s -> StringUtils.substringAfter(s, "|"))
115
                .collect(Collectors.toCollection(HashSet::new))
116
                .forEach(dsId -> datasources.addAll(conf.getCommunityForDatasource(dsId,param)));
117
        //updating counters for datasource matching
118
        datasources.forEach(c->context.getCounter(COUNTER_GROUP,"Matching datasource for community " + c).increment(1));
119
        communities.addAll(datasources);
120
        context.getCounter(COUNTER_GROUP,"Match found for content providers " ).increment(datasources.size());
121

    
122
        /*Tagging for Zenodo Communities*/
123
        final Set<String> czenodo = new HashSet<>();
124
        //final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
125
        mBuilder.getContextBuilderList().stream().filter(cBuilder -> cBuilder.getId().contains(ZENODO_COMMUNITY_INDICATOR))
126
                .collect(Collectors.toList())
127
                .forEach(c->czenodo.addAll(conf.getCommunityForZenodoCommunityValue(c.getId().substring(c.getId().lastIndexOf("/")+1).trim())));
128
        czenodo.forEach(c->context.getCounter(COUNTER_GROUP,"Matching Zenodo community for community " + c).increment(1));
129
        //updating counters for Zenodo communities matching
130
        context.getCounter(COUNTER_GROUP,"Match found for Zenodo communities " ).increment(czenodo.size());
131
        communities.addAll(czenodo);
132

    
133
        boolean removed = clearContext(mBuilder);
134

    
135
        /*Verify if there is something to bulktag*/
136
        if(communities.isEmpty()){
137
            context.getCounter(COUNTER_GROUP, "list of communities empty").increment(1);
138
//            return null;
139
            if (!removed)
140
                return null;
141
            else {
142
                context.getCounter(COUNTER_GROUP,"removed Zenodo community from result not to be enriched").increment(1);
143
                return builder.build();
144
            }
145
        }else{
146
            context.getCounter(COUNTER_GROUP, "list of communities has values!").increment(1);
147
        }
148

    
149
        final Map<String, ResultProtos.Result.Context.Builder> cBuilders = Maps.newHashMap();
150
        mBuilder.getContextBuilderList().forEach(cBuilder -> {
151
            cBuilders.put(cBuilder.getId(), cBuilder);
152
        });
153

    
154
        /*Applies actual tagging*/
155
        for(String contextId:communities){
156
            ResultProtos.Result.Context.Builder cBuilder = cBuilders.get(contextId);
157
            if (cBuilder != null) {
158
                if (!cBuilder.getDataInfoBuilderList().stream()
159
                        .map(di -> di.getInferenceprovenance())
160
                        .anyMatch(s -> DATA_INFO_TYPE.equals(s))) {
161
                    if (subjects.contains(contextId))
162
                        cBuilder.addDataInfo(buildDataInfo(CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
163
                    if(datasources.contains(contextId))
164
                        cBuilder.addDataInfo(buildDataInfo(CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
165
                    if(czenodo.contains(contextId))
166
                        cBuilder.addDataInfo(buildDataInfo(CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
167
                    context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
168
                } else {
169
                    context.getCounter(COUNTER_GROUP, "provenance already bulk tagged").increment(1);
170
                }
171
            } else {
172
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
173
                cBuilder = Context.newBuilder().setId(contextId);
174

    
175
                if (subjects.contains(contextId))
176
                    cBuilder.addDataInfo(buildDataInfo(CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
177
                if(datasources.contains(contextId))
178
                    cBuilder.addDataInfo(buildDataInfo(CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
179
                if(czenodo.contains(contextId))
180
                    cBuilder.addDataInfo(buildDataInfo(CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
181
                mBuilder.addContext(cBuilder.build());
182
            }
183
        }
184

    
185
        return builder.build();
186
    }
187

    
188
    public OafProtos.Oaf enrichContext(final OafProtos.Oaf oaf, final CommunityConfiguration conf, final Mapper.Context context) {
189

    
190
        final OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
191
        final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
192

    
193
        /*
194
        * Verify if the entity is deletedbyinference. In case verify if to clean the context list from all the zenodo communities
195
        * */
196
        if(oaf.getDataInfo().getDeletedbyinference()){
197
            context.getCounter(COUNTER_GROUP, "deleted by inference").increment(1);
198
            if(!clearContext(mBuilder))
199
                return null;
200
            context.getCounter(COUNTER_GROUP,"removed zenodo community context in del by inference record").increment(1);
201
            return builder.build();
202
        }
203

    
204
        //communities contains all the communities to be added as context for the result
205
        final Set<String> communities = new HashSet<>();
206

    
207
        /*
208
        *Tagging for Subject
209
        * */
210
        final Set<String> subjects = new HashSet<>();
211
        oaf.getEntity().getResult().getMetadata().getSubjectList().stream()
212
                .map(subject -> subject.getValue())
213
                .filter(StringUtils::isNotBlank)
214
                .map(String::toLowerCase)
215
                .map(String::trim)
216
                .collect(Collectors.toCollection(HashSet::new))
217
                .forEach(s -> subjects.addAll(conf.getCommunityForSubjectValue(s)));
218

    
219
        //updating counters for subject matching
220
        subjects.forEach(c->context.getCounter(COUNTER_GROUP, "Matching subject for community " + c).increment(1));
221
        communities.addAll(subjects);
222
        context.getCounter(COUNTER_GROUP,"match found for subjects ").increment(subjects.size());
223

    
224
        /*Tagging for datasource*/
225
        final Set<String> datasources = new HashSet<>();
226
        final Set<String> tmp = new HashSet<>();
227
        for(ResultProtos.Result.Instance i : oaf.getEntity().getResult().getInstanceList()){
228
            tmp.add(StringUtils.substringAfter(i.getCollectedfrom().getKey(),"|"));
229
            tmp.add(StringUtils.substringAfter(i.getHostedby().getKey(),"|"));
230
        }
231

    
232
        oaf.getEntity().getResult().getInstanceList()
233
                .stream()
234
                .map(i -> new Pair<>(i.getCollectedfrom().getKey(), i.getHostedby().getKey()))
235
                .flatMap(p -> Stream.of(p.getFst(), p.getSnd()))
236
                .map(s -> StringUtils.substringAfter(s, "|"))
237
                .collect(Collectors.toCollection(HashSet::new))
238
                .forEach(dsId -> datasources.addAll(conf.getCommunityForDatasourceValue(dsId)));
239
        //updating counters for datasource matching
240
        datasources.forEach(c->context.getCounter(COUNTER_GROUP,"Matching datasource for community " + c).increment(1));
241
        communities.addAll(datasources);
242
        context.getCounter(COUNTER_GROUP,"Match found for content providers " ).increment(datasources.size());
243

    
244
        /*Tagging for Zenodo Communities*/
245
        final Set<String> czenodo = new HashSet<>();
246
        //final ResultProtos.Result.Metadata.Builder mBuilder = builder.getEntityBuilder().getResultBuilder().getMetadataBuilder();
247
        mBuilder.getContextBuilderList().stream().filter(cBuilder -> cBuilder.getId().contains(ZENODO_COMMUNITY_INDICATOR))
248
                .collect(Collectors.toList())
249
                .forEach(c->czenodo.addAll(conf.getCommunityForZenodoCommunityValue(c.getId().substring(c.getId().lastIndexOf("/")+1).trim())));
250
        czenodo.forEach(c->context.getCounter(COUNTER_GROUP,"Matching Zenodo community for community " + c).increment(1));
251
        //updating counters for Zenodo communities matching
252
        context.getCounter(COUNTER_GROUP,"Match found for Zenodo communities " ).increment(czenodo.size());
253
        communities.addAll(czenodo);
254

    
255
        boolean removed = clearContext(mBuilder);
256

    
257
        /*Verify if there is something to bulktag*/
258
        if(communities.isEmpty()){
259
            context.getCounter(COUNTER_GROUP, "list of communities empty").increment(1);
260
//            return null;
261
            if (!removed)
262
                return null;
263
            else {
264
                context.getCounter(COUNTER_GROUP,"removed Zenodo community from result not to be enriched").increment(1);
265
                return builder.build();
266
            }
267
        }else{
268
            context.getCounter(COUNTER_GROUP, "list of communities has values!").increment(1);
269
        }
270

    
271
        final Map<String, ResultProtos.Result.Context.Builder> cBuilders = Maps.newHashMap();
272
        mBuilder.getContextBuilderList().forEach(cBuilder -> {
273
            cBuilders.put(cBuilder.getId(), cBuilder);
274
        });
275

    
276
        /*Applies actual tagging*/
277
        for(String contextId:communities){
278
            ResultProtos.Result.Context.Builder cBuilder = cBuilders.get(contextId);
279
            if (cBuilder != null) {
280
                if (!cBuilder.getDataInfoBuilderList().stream()
281
                        .map(di -> di.getInferenceprovenance())
282
                        .anyMatch(s -> DATA_INFO_TYPE.equals(s))) {
283
                    if (subjects.contains(contextId))
284
                        cBuilder.addDataInfo(buildDataInfo(CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
285
                    if(datasources.contains(contextId))
286
                        cBuilder.addDataInfo(buildDataInfo(CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
287
                    if(czenodo.contains(contextId))
288
                        cBuilder.addDataInfo(buildDataInfo(CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
289
                    context.getCounter(COUNTER_GROUP, "add provenance").increment(1);
290
                } else {
291
                    context.getCounter(COUNTER_GROUP, "provenance already bulk tagged").increment(1);
292
                }
293
            } else {
294
                context.getCounter(COUNTER_GROUP, "add context").increment(1);
295
                cBuilder = Context.newBuilder().setId(contextId);
296

    
297
                if (subjects.contains(contextId))
298
                    cBuilder.addDataInfo(buildDataInfo(CLASS_ID_SUBJECT, CLASS_NAME_BULKTAG_SUBJECT));
299
                if(datasources.contains(contextId))
300
                    cBuilder.addDataInfo(buildDataInfo(CLASS_ID_DATASOURCE, CLASS_NAME_BULKTAG_DATASOURCE));
301
                if(czenodo.contains(contextId))
302
                    cBuilder.addDataInfo(buildDataInfo(CLASS_ID_CZENODO, CLASS_NAME_BULKTAG_ZENODO));
303
                mBuilder.addContext(cBuilder.build());
304
            }
305
        }
306

    
307
        return builder.build();
308
    }
309

    
310

    
311

    
312
    private FieldTypeProtos.DataInfo buildDataInfo(String class_id, String class_name) {
313
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
314
                .setInferred(true)
315
                .setProvenanceaction(
316
                        FieldTypeProtos.Qualifier.newBuilder()
317
                                .setClassid(class_id)
318
                                .setClassname(class_name)
319
                                .setSchemeid(SCHEMA_ID)
320
                                .setSchemename(SCHEMA_NAME))
321
                .setInferenceprovenance(DATA_INFO_TYPE)
322
                .setTrust(trust);
323
        return builder
324
                .build();
325
    }
326

    
327

    
328
    public void setTrust(String s) {
329
        trust = s;
330
    }
331
}
(3-3/4)