Project

General

Profile

1 54836 miriam.bag
package eu.dnetlib.data.mapreduce.hbase.propagation.projecttoresult;
2
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import java.io.IOException;
15
import java.util.*;
16
import java.util.stream.Collectors;
17
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20
21
22
public class ProjectToResultMapper extends TableMapper<ImmutableBytesWritable, Text> {
23
    private static final String SEPARATOR = ",";
24
    private String[] sem_rels;
25
    private String trust;
26
27
    private ImmutableBytesWritable keyOut;
28
    private Text valueOut;
29
30
31
32
    @Override
33
    protected void setup(final Context context) throws IOException, InterruptedException {
34
35
        keyOut = new ImmutableBytesWritable();
36
        valueOut = new Text();
37
38 56084 miriam.bag
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_PROJECT_RELATION_SET);
39 54836 miriam.bag
        trust = context.getConfiguration().get("propagatetoproject.trust","0.85");
40
41
    }
42
43
    @Override
44
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
45
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
46
        //If the type is not result I do not need to process it
47
        if(!type.equals(TypeProtos.Type.result)) {
48
            return;
49
        }
50
        //verify if entity is valid
51
        final OafProtos.OafEntity entity = getEntity(value, type);
52
        if (entity == null) {
53
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
54
            return;
55
        }
56
57
        context.getCounter(COUNTER_PROPAGATION, "Valid result ").increment(1);
58
59
        //selection of all the projects associated to this result
60 54871 miriam.bag
        String projectIdList = getProjectIdList(value,context);
61 54836 miriam.bag
62
        //if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project
63
        if(StringUtils.isNotBlank(projectIdList)){
64
            final Set<String> toemitrelations = new HashSet<>();
65
            //selection of all the results bind by this result considering all the allowed semantic relations
66 54871 miriam.bag
            for (String sem_rel : sem_rels) {
67
                toemitrelations.addAll(getRelationTarget(value, sem_rel, context));
68
            }
69
            if (!toemitrelations.isEmpty()) {
70
                emit(context, toemitrelations, projectIdList);
71
                context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
72 54836 miriam.bag
73
74 54871 miriam.bag
            }
75
            //This emit is to specify which projects are already associated to this result
76
            //Not to write an update from related result
77
            keyOut.set(entity.getId().getBytes());
78
            valueOut.set(Value.newInstance(projectIdList, Type.fromresult).toJson());
79 54836 miriam.bag
80 54871 miriam.bag
            context.write(keyOut, valueOut);
81
            context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
82
        }
83 54836 miriam.bag
    }
84
85
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
86
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
87
        for(String relation : toemitrelations){
88
            keyOut.set(relation.getBytes());
89
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
90
            context.write(keyOut, valueOut);
91
        }
92
    }
93
94
    //starting from the Result gets the list of projects it is related to and returns it as a csv
95 54871 miriam.bag
    private String getProjectIdList(Result value, final Context context) throws InvalidProtocolBufferException {
96
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY, context);
97 54836 miriam.bag
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
98
    }
99
100 54871 miriam.bag
    private Set<String> getRelationTarget(Result value, String sem_rel, final Context context) throws InvalidProtocolBufferException {
101 54836 miriam.bag
102
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
103
104 54871 miriam.bag
        context.getCounter(COUNTER_PROPAGATION, sem_rel).increment(relationMap.size());
105
106
107 54836 miriam.bag
        /*
108
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
109
        return relationMap.keySet().stream()
110
                .map(String::new)
111
                .collect(Collectors.toCollection(HashSet::new));
112
        */
113
114
        return relationMap.values().stream()
115
                .map(this::asOaf)
116
                .filter(Objects::nonNull)
117
                .filter(o -> isValid(o))
118
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
119
                .map(o -> o.getRel().getTarget())
120
                .collect(Collectors.toCollection(HashSet::new));
121
122
    }
123
124
    private OafProtos.Oaf asOaf(byte[] r) {
125
        try {
126
            return OafProtos.Oaf.parseFrom(r);
127
        } catch (InvalidProtocolBufferException e) {
128
            return null;
129
        }
130
    }
131
132
133
    private boolean isValid(final OafProtos.Oaf oaf) {
134
        return (oaf != null) && oaf.isInitialized();
135
    }
136
}