Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import java.io.IOException;
15
import java.util.*;
16
import java.util.stream.Collectors;
17

    
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20

    
21

    
22
//public class ProjectToResultMapper extends TableMapper<ProjectToResultKey, Text> {
23
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
24
    private static final String SEPARATOR = ",";
25
    private String[] sem_rels;
26
    private String trust;
27

    
28
    private ImmutableBytesWritable keyOut;
29
    private Text valueOut;
30

    
31

    
32

    
33
    @Override
34
    protected void setup(final Context context) throws IOException, InterruptedException {
35

    
36
        keyOut = new ImmutableBytesWritable();
37
        valueOut = new Text();
38

    
39
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET);
40
        trust = context.getConfiguration().get("propagatetoproject.trust","0.85");
41

    
42
    }
43

    
44
    @Override
45
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
46
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
47
        //If the type is not result I do not need to process it
48
        if(!type.equals(TypeProtos.Type.result)) {
49
            return;
50
        }
51
        //verify if entity is valid
52
        final OafProtos.OafEntity entity = getEntity(value, type);
53
        if (entity == null) {
54
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
55
            return;
56
        }
57

    
58
        context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
59

    
60
        //selection of all the projects associated to this result
61
        String projectIdList = getProjectIdList(value);
62

    
63
        //if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project
64
        if(StringUtils.isNotBlank(projectIdList)){
65
            final Set<String> toemitrelations = new HashSet<>();
66
            //selection of all the results bind by this result considering all the allowed semantic relations
67
            for (String sem_rel : sem_rels)
68
                toemitrelations.addAll(getRelationTarget(value,sem_rel));
69
            emit(context, toemitrelations, projectIdList);
70
            context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
71
        }
72

    
73
        keyOut.set(entity.getId().getBytes());
74
        valueOut.set(Value.newInstance(projectIdList,Type.fromresult).toJson());
75
        //context.write(ProjectToResultKey.create(FROM_RESULT,entity.getId()), valueOut);
76

    
77
        context.write(keyOut, valueOut);
78
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
79

    
80
    }
81

    
82
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
83
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
84
        for(String relation : toemitrelations){
85
            keyOut.set(relation.getBytes());
86
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
87
            context.write(keyOut, valueOut);
88
        }
89
    }
90

    
91
    //starting from the Result gets the list of projects it is related to and returns it as a csv
92
    private String getProjectIdList(Result value) throws InvalidProtocolBufferException {
93
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY);
94
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
95
    }
96

    
97
    private Set<String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException {
98

    
99
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
100

    
101
        /*
102
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
103
        return relationMap.keySet().stream()
104
                .map(String::new)
105
                .collect(Collectors.toCollection(HashSet::new));
106
        */
107

    
108
        return relationMap.values().stream()
109
                .map(this::asOaf)
110
                .filter(Objects::nonNull)
111
                .filter(o -> isValid(o))
112
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
113
                .map(o -> o.getRel().getTarget())
114
                .collect(Collectors.toCollection(HashSet::new));
115

    
116
    }
117

    
118
    private OafProtos.Oaf asOaf(byte[] r) {
119
        try {
120
            return OafProtos.Oaf.parseFrom(r);
121
        } catch (InvalidProtocolBufferException e) {
122
            return null;
123
        }
124
    }
125

    
126

    
127
    private boolean isValid(final OafProtos.Oaf oaf) {
128
        return (oaf != null) && oaf.isInitialized();
129
    }
130
}
(4-4/8)