Project

General

Profile

« Previous | Next » 

Revision 54039

modified type for type variable. From int to Type

View differences:

ProjectToResultMapper.java
5 5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6 6
import eu.dnetlib.data.proto.OafProtos;
7 7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
8 9
import org.apache.hadoop.hbase.client.Result;
9 10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10 11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11 12
import org.apache.hadoop.hbase.util.Bytes;
12 13
import org.apache.hadoop.io.Text;
13 14
import java.io.IOException;
14
import java.util.Collection;
15
import java.util.HashSet;
16
import java.util.Map;
17
import java.util.Set;
15
import java.util.*;
16
import java.util.stream.Collectors;
18 17

  
19 18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
20 19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
......
25 24
    private static final String SEPARATOR = ",";
26 25
    private String[] sem_rels;
27 26
    private String trust;
27

  
28
    private ImmutableBytesWritable keyOut;
28 29
    private Text valueOut;
29 30

  
31

  
32

  
30 33
    @Override
31 34
    protected void setup(final Context context) throws IOException, InterruptedException {
32
        super.setup(context);
35

  
36
        keyOut = new ImmutableBytesWritable();
33 37
        valueOut = new Text();
34
        String[] default_set = {"resultResult_supplement_isSupplementedBy"};
35 38

  
36
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations",default_set);
39
        sem_rels = context.getConfiguration().getStrings("propagatetoproject.semanticrelations", DEFAULT_RELATION_SET);
37 40
        trust = context.getConfiguration().get("propagatetoproject.trust","0.85");
38 41

  
39 42

  
......
43 46
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
44 47
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
45 48
        //If the type is not result I do not need to process it
46
        if(type!= TypeProtos.Type.result){
49
        if(!type.equals(TypeProtos.Type.result)) {
47 50
            return;
48 51
        }
49 52
        //verify if entity is valid
50
        final OafProtos.OafEntity entity = getEntity(value,type);
53
        final OafProtos.OafEntity entity = getEntity(value, type);
51 54
        if (entity == null) {
52 55
            context.getCounter(COUNTER_PROPAGATION,"Del by inference or null body for result").increment(1);
53 56
            return;
......
59 62
        String projectIdList = getProjectIdList(value);
60 63

  
61 64
        //if the list of projects is not empty, verify if it exists some allowed semantic relation to which propagate the project
62
        if(projectIdList != null){
65
        if(StringUtils.isNotBlank(projectIdList)){
63 66
            final Set<String> toemitrelations = new HashSet<>();
64 67
            //selection of all the results bind by this result considering all the allowed semantic relations
65 68
            for (String sem_rel : sem_rels)
......
68 71
            context.getCounter(COUNTER_PROPAGATION, "emit for semantic relation").increment(toemitrelations.size());
69 72
        }
70 73

  
71
        valueOut.set(Value.newInstance(projectIdList,FROM_RESULT).toJson());
74
        keyOut.set(entity.getId().getBytes());
75
        valueOut.set(Value.newInstance(projectIdList,Type.fromresult).toJson());
72 76
        //context.write(ProjectToResultKey.create(FROM_RESULT,entity.getId()), valueOut);
73
        context.write(new ImmutableBytesWritable(entity.getId().getBytes()), valueOut);
77

  
78
        context.write(keyOut, valueOut);
74 79
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
75 80

  
76 81
    }
77 82

  
78 83
    //emit for each valid semantic relation the id of the relation target and the list of projects associated to the source of the relation
79 84
    private void emit( Context context, Set<String> toemitrelations, String projectIdList) throws IOException, InterruptedException {
80
        for(String relation:toemitrelations){
81
            valueOut.set(Value.newInstance( projectIdList,trust,FROM_SEM_REL).toJson());
82
            context.write(new ImmutableBytesWritable(relation.getBytes()), valueOut);
83

  
85
        for(String relation : toemitrelations){
86
            keyOut.set(relation.getBytes());
87
            valueOut.set(Value.newInstance( projectIdList,trust,Type.fromsemrel).toJson());
88
            context.write(keyOut, valueOut);
84 89
        }
85 90
    }
86 91

  
87
    private Collection<? extends String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException {
88
        final Set<String> toemitrelations = new HashSet<>();
92
    //starting from the Result gets the list of projects it is related to and returns it as a csv
93
    private String getProjectIdList(Result value) throws InvalidProtocolBufferException {
94
        Set<String> ret = getRelationTarget(value, OUTCOME_PRODUCEDBY);
95
        return ret.size() == 0 ? null : String.join(SEPARATOR, ret);
96
    }
97

  
98
    private Set<String> getRelationTarget(Result value, String sem_rel) throws InvalidProtocolBufferException {
99

  
89 100
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
90
        for (byte[] relation : relationMap.values()) {
91
            final OafProtos.Oaf rel_oaf = OafProtos.Oaf.parseFrom(relation);
92
            if (isValid(rel_oaf)) {
93
                if (!rel_oaf.getDataInfo().getDeletedbyinference()) {
94
                    toemitrelations.add(rel_oaf.getRel().getTarget());
95
                }
96
            }
97 101

  
98
        }
99
        return toemitrelations;
100
    }
102
        /*
103
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
104
        return relationMap.keySet().stream()
105
                .map(String::new)
106
                .collect(Collectors.toCollection(HashSet::new));
107
        */
101 108

  
109
        return relationMap.values().stream()
110
                .map(this::asOaf)
111
                .filter(Objects::nonNull)
112
                .filter(o -> isValid(o))
113
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
114
                .map(o -> o.getRel().getTarget())
115
                .collect(Collectors.toCollection(HashSet::new));
102 116

  
103
    //starting from the Result gets the list of projects it is related to and returns it as a csv
104
    private String getProjectIdList(Result value) throws InvalidProtocolBufferException {
105
        Collection<? extends String> ret = getRelationTarget(value, "resultProject_outcome_isProducedBy");
106
        return ret.size() == 0 ? null : String.join(SEPARATOR,ret);
107 117
    }
108 118

  
119
    private OafProtos.Oaf asOaf(byte[] r) {
120
        try {
121
            return OafProtos.Oaf.parseFrom(r);
122
        } catch (InvalidProtocolBufferException e) {
123
            return null;
124
        }
125
    }
109 126

  
110 127

  
111 128
    private boolean isValid(final OafProtos.Oaf oaf) {

Also available in: Unified diff