Project

General

Profile

1 56083 miriam.bag
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2
3 56244 miriam.bag
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5
import com.google.common.reflect.TypeToken;
6
import com.google.gson.Gson;
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
9 56083 miriam.bag
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10 56244 miriam.bag
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11 56083 miriam.bag
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
12
import eu.dnetlib.data.proto.FieldTypeProtos;
13
import eu.dnetlib.data.proto.OafProtos;
14
import eu.dnetlib.data.proto.ResultProtos;
15
import eu.dnetlib.data.proto.TypeProtos;
16 56244 miriam.bag
17 56083 miriam.bag
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20 56244 miriam.bag
import org.apache.hadoop.hbase.util.Bytes;
21 56083 miriam.bag
import org.apache.hadoop.io.Text;
22 56244 miriam.bag
23 56083 miriam.bag
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
24 56244 miriam.bag
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
26 56083 miriam.bag
27
import java.io.IOException;
28 56244 miriam.bag
import java.lang.reflect.Type;
29
import java.util.*;
30
import java.util.stream.Collectors;
31 56083 miriam.bag
32
33
public class CommunityToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
34
35
    private ImmutableBytesWritable keyOut;
36
    private Text valueOut;
37
    private String[] sem_rels;
38
    private String trust;
39 56244 miriam.bag
    List<String> idCommunityList;
40 56083 miriam.bag
41
    @Override
42
    protected void setup(final Context context) throws IOException, InterruptedException {
43 56244 miriam.bag
        Type listType = new TypeToken<List<String>>() {}.getType();
44 56083 miriam.bag
45 56244 miriam.bag
        idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"),listType);
46 56083 miriam.bag
        keyOut = new ImmutableBytesWritable();
47
        valueOut = new Text();
48
49
        sem_rels = context.getConfiguration().getStrings("propagatetocommunity.semanticrelations", DEFAULT_COMMUNITY_RELATION_SET);
50
        trust = context.getConfiguration().get("propagatetocommunity.trust","0.85");
51
52
    }
53
54
    @Override
55
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
56
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
57
        //If the type is not result I do not need to process it
58 56244 miriam.bag
        if (!type.equals(TypeProtos.Type.result)) {
59 56083 miriam.bag
            return;
60
        }
61 56244 miriam.bag
62
63 56083 miriam.bag
        //verify if entity is valid
64
        final OafProtos.OafEntity entity = getEntity(value, type);
65
        if (entity == null) {
66 56244 miriam.bag
            context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
67 56083 miriam.bag
            return;
68
        }
69 56244 miriam.bag
        final Set<String> toemitrelations = new HashSet<>();
70
        //verify if we have some relation
71
        for (String sem_rel : sem_rels)
72
            toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION));
73 56083 miriam.bag
74 56244 miriam.bag
        if (toemitrelations.isEmpty()) {
75
            context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1);
76
            return;
77 56083 miriam.bag
        }
78
79 56244 miriam.bag
        //verify if we have a relation to a context in the body
80
        Set<String> contextIds = entity.getResult().getMetadata().getContextList()
81
                .stream()
82
                .map(ResultProtos.Result.Context::getId)
83
                .collect(Collectors.toSet());
84 56083 miriam.bag
85 56244 miriam.bag
        //verify if we have a relation to a context in the update part made by the inference
86
        NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString()));
87 56083 miriam.bag
88 56244 miriam.bag
        final Map<String, byte[]> stringMap = Maps.newHashMap();
89
        for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
90
            stringMap.put(Bytes.toString(e.getKey()), e.getValue());
91
        }
92 56083 miriam.bag
93 56244 miriam.bag
        // we fetch all the body updates
94
        for (final String o : stringMap.keySet()) {
95
            if (o.startsWith("update_")) {
96
                final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(map.get(o));
97
                contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList()
98
                        .stream()
99
                        .map(ResultProtos.Result.Context::getId)
100
                        .map(s -> s.split("::")[0])
101
                        .collect(Collectors.toSet()));
102
            }
103
        }
104 56083 miriam.bag
105 56244 miriam.bag
        //we verify if we have something
106
        if (contextIds.isEmpty()) {
107
            context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1);
108
            return;
109 56083 miriam.bag
        }
110
111 56244 miriam.bag
        //verify if some of the context collected for the result are associated to a community in the communityIdList
112
        for (String id : idCommunityList) {
113
            if (contextIds.contains(id)) {
114
                for (String target : toemitrelations) {
115
                    keyOut.set(target.getBytes());
116
                    valueOut.set(Value.newInstance(id).setTrust(trust).toJson());
117
                    context.write(keyOut, valueOut);
118
                    context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1);
119 56083 miriam.bag
                }
120 56244 miriam.bag
            }
121 56083 miriam.bag
        }
122
123 56244 miriam.bag
    }
124 56083 miriam.bag
125 56244 miriam.bag
}