Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2

    
3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5
import com.google.common.reflect.TypeToken;
6
import com.google.gson.Gson;
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
9
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
12
import eu.dnetlib.data.proto.FieldTypeProtos;
13
import eu.dnetlib.data.proto.OafProtos;
14
import eu.dnetlib.data.proto.ResultProtos;
15
import eu.dnetlib.data.proto.TypeProtos;
16

    
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
21
import org.apache.hadoop.io.Text;
22

    
23
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
26

    
27
import java.io.IOException;
28
import java.lang.reflect.Type;
29
import java.util.*;
30
import java.util.stream.Collectors;
31

    
32

    
33
public class CommunityToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
34

    
35
    private ImmutableBytesWritable keyOut;
36
    private Text valueOut;
37
    private String[] sem_rels;
38
    private String trust;
39
    CommunityList idCommunityList;
40

    
41
    @Override
42
    protected void setup(final Context context) throws IOException, InterruptedException {
43

    
44
        idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"), CommunityList.class);
45
        keyOut = new ImmutableBytesWritable();
46
        valueOut = new Text();
47

    
48
        sem_rels = context.getConfiguration().getStrings("propagatetocommunity.semanticrelations", DEFAULT_COMMUNITY_RELATION_SET);
49
        trust = context.getConfiguration().get("propagatetocommunity.trust","0.85");
50

    
51
    }
52

    
53
    @Override
54
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
55
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
56
        //If the type is not result I do not need to process it
57
        if (!type.equals(TypeProtos.Type.result)) {
58
            return;
59
        }
60

    
61

    
62
        //verify if entity is valid
63
        final OafProtos.OafEntity entity = getEntity(value, type);
64
        if (entity == null) {
65
            context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
66
            return;
67
        }
68
        final Set<String> toemitrelations = new HashSet<>();
69
        //verify if we have some relation
70
        for (String sem_rel : sem_rels)
71
            toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION));
72

    
73
        if (toemitrelations.isEmpty()) {
74
            context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1);
75
            return;
76
        }
77

    
78
        //verify if we have a relation to a context in the body
79
        Set<String> contextIds = entity.getResult().getMetadata().getContextList()
80
                .stream()
81
                .map(ResultProtos.Result.Context::getId)
82
                .collect(Collectors.toSet());
83

    
84
        //verify if we have a relation to a context in the update part made by the inference
85
        NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString()));
86

    
87
        final Map<String, byte[]> stringMap = Maps.newHashMap();
88
        for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
89
            stringMap.put(Bytes.toString(e.getKey()), e.getValue());
90
        }
91

    
92
        // we fetch all the body updates
93
        for (final String o : stringMap.keySet()) {
94
            if (o.startsWith("update_")) {
95
                final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(stringMap.get(o));
96
                contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList()
97
                        .stream()
98
                        .map(ResultProtos.Result.Context::getId)
99
                        .map(s -> s.split("::")[0])
100
                        .collect(Collectors.toSet()));
101
            }
102
        }
103

    
104
        //we verify if we have something
105
        if (contextIds.isEmpty()) {
106
            context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1);
107
            return;
108
        }
109

    
110
        //verify if some of the context collected for the result are associated to a community in the communityIdList
111
        for (String id : idCommunityList) {
112
            if (contextIds.contains(id)) {
113
                for (String target : toemitrelations) {
114
                    keyOut.set(target.getBytes());
115
                    valueOut.set(Value.newInstance(id).setTrust(trust).toJson());
116
                    context.write(keyOut, valueOut);
117
                    context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1);
118
                }
119
            }
120
        }
121

    
122
    }
123

    
124
}
125

    
(3-3/4)