Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult;
2

    
3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5
import com.google.common.reflect.TypeToken;
6
import com.google.gson.Gson;
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
9
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
12
import eu.dnetlib.data.proto.FieldTypeProtos;
13
import eu.dnetlib.data.proto.OafProtos;
14
import eu.dnetlib.data.proto.ResultProtos;
15
import eu.dnetlib.data.proto.TypeProtos;
16

    
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
21
import org.apache.hadoop.io.Text;
22

    
23
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
26

    
27
import java.io.IOException;
28
import java.lang.reflect.Type;
29
import java.util.*;
30
import java.util.stream.Collectors;
31

    
32

    
33
public class CommunityToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
34

    
35
    private ImmutableBytesWritable keyOut;
36
    private Text valueOut;
37
    private String[] sem_rels;
38
    private String trust;
39
    List<String> idCommunityList;
40

    
41
    @Override
42
    protected void setup(final Context context) throws IOException, InterruptedException {
43
        Type listType = new TypeToken<List<String>>() {}.getType();
44

    
45
        idCommunityList = new Gson().fromJson(context.getConfiguration().get("community.id.list"),listType);
46
        keyOut = new ImmutableBytesWritable();
47
        valueOut = new Text();
48

    
49
        sem_rels = context.getConfiguration().getStrings("propagatetocommunity.semanticrelations", DEFAULT_COMMUNITY_RELATION_SET);
50
        trust = context.getConfiguration().get("propagatetocommunity.trust","0.85");
51

    
52
    }
53

    
54
    @Override
55
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
56
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
57
        //If the type is not result I do not need to process it
58
        if (!type.equals(TypeProtos.Type.result)) {
59
            return;
60
        }
61

    
62

    
63
        //verify if entity is valid
64
        final OafProtos.OafEntity entity = getEntity(value, type);
65
        if (entity == null) {
66
            context.getCounter(COUNTER_PROPAGATION, "Del by inference or null body for result").increment(1);
67
            return;
68
        }
69
        final Set<String> toemitrelations = new HashSet<>();
70
        //verify if we have some relation
71
        for (String sem_rel : sem_rels)
72
            toemitrelations.addAll(getRelationTarget(value, sem_rel, context, COUNTER_PROPAGATION));
73

    
74
        if (toemitrelations.isEmpty()) {
75
            context.getCounter(COUNTER_PROPAGATION, "No allowed semantic relation present in result").increment(1);
76
            return;
77
        }
78

    
79
        //verify if we have a relation to a context in the body
80
        Set<String> contextIds = entity.getResult().getMetadata().getContextList()
81
                .stream()
82
                .map(ResultProtos.Result.Context::getId)
83
                .collect(Collectors.toSet());
84

    
85
        //verify if we have a relation to a context in the update part made by the inference
86
        NavigableMap<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(TypeProtos.Type.result.toString()));
87

    
88
        final Map<String, byte[]> stringMap = Maps.newHashMap();
89
        for (Map.Entry<byte[], byte[]> e : map.entrySet()) {
90
            stringMap.put(Bytes.toString(e.getKey()), e.getValue());
91
        }
92

    
93
        // we fetch all the body updates
94
        for (final String o : stringMap.keySet()) {
95
            if (o.startsWith("update_")) {
96
                final OafProtos.Oaf update = OafProtos.Oaf.parseFrom(map.get(o));
97
                contextIds.addAll(update.getEntity().getResult().getMetadata().getContextList()
98
                        .stream()
99
                        .map(ResultProtos.Result.Context::getId)
100
                        .map(s -> s.split("::")[0])
101
                        .collect(Collectors.toSet()));
102
            }
103
        }
104

    
105
        //we verify if we have something
106
        if (contextIds.isEmpty()) {
107
            context.getCounter(COUNTER_PROPAGATION, "No context in the body and in the update of the result").increment(1);
108
            return;
109
        }
110

    
111
        //verify if some of the context collected for the result are associated to a community in the communityIdList
112
        for (String id : idCommunityList) {
113
            if (contextIds.contains(id)) {
114
                for (String target : toemitrelations) {
115
                    keyOut.set(target.getBytes());
116
                    valueOut.set(Value.newInstance(id).setTrust(trust).toJson());
117
                    context.write(keyOut, valueOut);
118
                    context.getCounter(COUNTER_PROPAGATION, "Emit propagation for " + id).increment(1);
119
                }
120
            }
121
        }
122

    
123
    }
124

    
125
}
126

    
(2-2/3)