Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import java.io.IOException;
15
import java.util.*;
16
import java.util.stream.Collectors;
17

    
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20

    
21

    
22
//public class ProjectToResultMapper extends TableMapper<ProjectToResultKey, Text> {
23
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
24
    private static final String SEPARATOR = ",";
25
    private String[] sem_rels;
26
    private String trust;
27

    
28
    private ImmutableBytesWritable keyOut;
29
    private Text valueOut;
30

    
31

    
32

    
33
    @Override
34
    protected void setup(final Context context) throws IOException, InterruptedException {
35

    
36
        keyOut = new ImmutableBytesWritable();
37
        valueOut = new Text();
38

    
39
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET);
40
        trust = context.getConfiguration().get("propagatetoproject.trust","0.85");
41

    
42

    
43
    }
44

    
45
    @Override
46
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
47
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
48
        //If the type is not result I do not need to process it
49
        if(!type.equals(TypeProtos.Type.result)) {
50
            return;
51
        }
52
        //verify if entity is valid
53
        final OafProtos.OafEntity entity = getEntity(value, type);
54
        if (entity == null) {
55
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
56
            return;
57
        }
58

    
59
        context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
60

    
61
        //selection of all the projects associated to this result
62
        String projectIdList = getProjectIdList(value);
63

    
64
        //if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project
65
        if(StringUtils.isNotBlank(projectIdList)){
66
            final Set<String> toemitrelations = new HashSet<>();
67
            //selection of all the results bind by this result considering all the allowed semantic relations
68
            for (String sem_rel : sem_rels)
69
                toemitrelations.addAll(getRelationTarget(value,sem_rel));
70
            emit(context, toemitrelations, projectIdList);
71
            context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
72
        }
73

    
74
        keyOut.set(entity.getId().getBytes());
75
        valueOut.set(Value.newInstance(projectIdList,Type.fromresult).toJson());
76
        //context.write(ProjectToResultKey.create(FROM_RESULT,entity.getId()), valueOut);
77

    
78
        context.write(keyOut, valueOut);
79
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
80

    
81
    }
82

    
83
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
84
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
85
        for(String relation : toemitrelations){
86
            keyOut.set(relation.getBytes());
87
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
88
            context.write(keyOut, valueOut);
89
        }
90
    }
91

    
92
    //starting from the Result gets the list of projects it is related to and returns it as a csv
93
    private String getProjectIdList(Result value) throws InvalidProtocolBufferException {
94
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY);
95
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
96
    }
97

    
98
    private Set<String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException {
99

    
100
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
101

    
102
        /*
103
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
104
        return relationMap.keySet().stream()
105
                .map(String::new)
106
                .collect(Collectors.toCollection(HashSet::new));
107
        */
108

    
109
        return relationMap.values().stream()
110
                .map(this::asOaf)
111
                .filter(Objects::nonNull)
112
                .filter(o -> isValid(o))
113
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
114
                .map(o -> o.getRel().getTarget())
115
                .collect(Collectors.toCollection(HashSet::new));
116

    
117
    }
118

    
119
    private OafProtos.Oaf asOaf(byte[] r) {
120
        try {
121
            return OafProtos.Oaf.parseFrom(r);
122
        } catch (InvalidProtocolBufferException e) {
123
            return null;
124
        }
125
    }
126

    
127

    
128
    private boolean isValid(final OafProtos.Oaf oaf) {
129
        return (oaf != null) && oaf.isInitialized();
130
    }
131
}
(4-4/6)