Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import java.io.IOException;
15
import java.util.*;
16
import java.util.stream.Collectors;
17

    
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20

    
21

    
22
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
23
    private static final String SEPARATOR = ",";
24
    private String[] sem_rels;
25
    private String trust;
26

    
27
    private ImmutableBytesWritable keyOut;
28
    private Text valueOut;
29

    
30

    
31

    
32
    @Override
33
    protected void setup(final Context context) throws IOException, InterruptedException {
34

    
35
        keyOut = new ImmutableBytesWritable();
36
        valueOut = new Text();
37

    
38
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET);
39
        trust = context.getConfiguration().get("propagatetoproject.trust","0.85");
40

    
41
    }
42

    
43
    @Override
44
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
45
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
46
        //If the type is not result I do not need to process it
47
        if(!type.equals(TypeProtos.Type.result)) {
48
            return;
49
        }
50
        //verify if entity is valid
51
        final OafProtos.OafEntity entity = getEntity(value, type);
52
        if (entity == null) {
53
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
54
            return;
55
        }
56

    
57
        context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
58

    
59
        //selection of all the projects associated to this result
60
        String projectIdList = getProjectIdList(value,context);
61

    
62
        //if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project
63
        if(StringUtils.isNotBlank(projectIdList)){
64
            final Set<String> toemitrelations = new HashSet<>();
65
            //selection of all the results bind by this result considering all the allowed semantic relations
66
            for (String sem_rel : sem_rels) {
67
                toemitrelations.addAll(getRelationTarget(value, sem_rel, context));
68
            }
69
            if (!toemitrelations.isEmpty()) {
70
                emit(context, toemitrelations, projectIdList);
71
                context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
72

    
73

    
74
            }
75
            //This emit is to specify which projects are already associated to this result
76
            //Not to write an update from related result
77
            keyOut.set(entity.getId().getBytes());
78
            valueOut.set(Value.newInstance(projectIdList, Type.fromresult).toJson());
79

    
80
            context.write(keyOut, valueOut);
81
            context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
82
        }
83
    }
84

    
85
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
86
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
87
        for(String relation : toemitrelations){
88
            keyOut.set(relation.getBytes());
89
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
90
            context.write(keyOut, valueOut);
91
        }
92
    }
93

    
94
    //starting from the Result gets the list of projects it is related to and returns it as a csv
95
    private String getProjectIdList(Result value, final Context context) throws InvalidProtocolBufferException {
96
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY, context);
97
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
98
    }
99

    
100
    private Set<String> getRelationTarget(Result value, String sem_rel, final Context context) throws InvalidProtocolBufferException {
101

    
102
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
103

    
104
        context.getCounter(COUNTER_PROPAGATION, sem_rel).increment(relationMap.size());
105

    
106

    
107
        /*
108
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
109
        return relationMap.keySet().stream()
110
                .map(String::new)
111
                .collect(Collectors.toCollection(HashSet::new));
112
        */
113

    
114
        return relationMap.values().stream()
115
                .map(this::asOaf)
116
                .filter(Objects::nonNull)
117
                .filter(o -> isValid(o))
118
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
119
                .map(o -> o.getRel().getTarget())
120
                .collect(Collectors.toCollection(HashSet::new));
121

    
122
    }
123

    
124
    private OafProtos.Oaf asOaf(byte[] r) {
125
        try {
126
            return OafProtos.Oaf.parseFrom(r);
127
        } catch (InvalidProtocolBufferException e) {
128
            return null;
129
        }
130
    }
131

    
132

    
133
    private boolean isValid(final OafProtos.Oaf oaf) {
134
        return (oaf != null) && oaf.isInitialized();
135
    }
136
}
(4-4/8)