Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2

    
3
import com.google.common.collect.Maps;
4
import com.google.gson.Gson;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.data.proto.ResultProtos;
9
import eu.dnetlib.data.proto.TypeProtos;
10
import org.apache.hadoop.hbase.client.Delete;
11
import org.apache.hadoop.hbase.client.Result;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
import org.apache.hadoop.hbase.util.Bytes;
15
import org.apache.hadoop.io.Text;
16

    
17
import java.io.IOException;
18
import java.util.HashSet;
19
import java.util.Map;
20
import java.util.NavigableMap;
21
import java.util.Set;
22
import java.util.stream.Collectors;
23

    
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DEFAULT_COMMUNITY_RELATION_SET;
26
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
27
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
28

    
29

    
30
public class CommunityToResultMapper extends TableMapper<Text, Text> {
31

    
32
    private Text keyOut;
33
    private Text valueOut;
34
    private String[] sem_rels;
35
    private String trust;
36
    CommunityList idCommunityList;
37

    
38
    @Override
39
    protected void setup(final Context context) throws IOException, InterruptedException {
40

    
41
        idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"), CommunityList.class);
42
        keyOut = new Text();
43
        valueOut = new Text();
44

    
45
        sem_rels = context.getConfiguration().getStrings("propagatetocommunity.semanticrelations", DEFAULT_COMMUNITY_RELATION_SET);
46
        trust = context.getConfiguration().get("propagatetocommunity.trust","0.85");
47

    
48
    }
49

    
50
    @Override
51
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
52

    
53
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
54

    
55
        //If the type is not result I do not need to process it
56
        if (!type.equals(TypeProtos.Type.result)) {
57
            return;
58
        }
59

    
60
        //verify if entity is valid
61
        final OafProtos.OafEntity entity = getEntity(value, type);
62
        if (entity == null) {
63
            context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
64
            return;
65
        }
66
        final Set<String> toemitrelations = new HashSet<>();
67
        //verify if we have some relation
68
        for (String sem_rel : sem_rels)
69
            toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION));
70

    
71
        if (toemitrelations.isEmpty()) {
72
            context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1);
73
            return;
74
        }
75

    
76
        //verify if we have a relation to a context in the body
77
        Set<String> contextIds = entity.getResult().getMetadata().getContextList()
78
                .stream()
79
                .map(ResultProtos.Result.Context::getId)
80
                .collect(Collectors.toSet());
81

    
82
        //verify if we have a relation to a context in the update part made by the inference
83
        NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString()));
84

    
85
        final Map<String, byte[]> stringMap = Maps.newHashMap();
86
        for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
87
            stringMap.put(Bytes.toString(e.getKey()), e.getValue());
88
        }
89

    
90
        // we fetch all the body updates
91
        for (final String o : stringMap.keySet()) {
92
            if (o.startsWith("update_")) {
93
                final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(stringMap.get(o));
94
                contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList()
95
                        .stream()
96
                        .map(ResultProtos.Result.Context::getId)
97
                        .map(s -> s.split("::")[0])
98
                        .collect(Collectors.toSet()));
99
            }
100
        }
101

    
102
        //we verify if we have something
103
        if (contextIds.isEmpty()) {
104
            context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1);
105
            return;
106
        }
107

    
108
        //verify if some of the context collected for the result are associated to a community in the communityIdList
109
        for (String id : idCommunityList) {
110
            if (contextIds.contains(id)) {
111
                for (String target : toemitrelations) {
112
                    keyOut.set(target);
113
                    valueOut.set(Value.newInstance(id).setTrust(trust).toJson());
114
                    context.write(keyOut, valueOut);
115
                    context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1);
116
                }
117
            }
118
        }
119

    
120
    }
121

    
122
}
123

    
(3-3/4)