Project

General

Profile

« Previous | Next » 

Revision 58601

less memory pressure on the hbase table export job, context propagation utils. Proto exporter aligned with most recent dhp.model changes

View differences:

Utils.java
11 11
import java.util.Objects;
12 12
import java.util.Set;
13 13
import java.util.stream.Collectors;
14
import java.util.stream.Stream;
14 15

  
15 16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
16 17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DATA_INFO_TYPE;
17 18

  
18 19
public final class Utils {
20

  
21
    public static final int MAX_RELATIONS = 10000;
22

  
19 23
    public static  OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
20 24
        final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body"));
21 25
        if (body != null){
......
74 78

  
75 79
    public static Set<String> getRelationTarget(Result value, String sem_rel, final Mapper.Context context, String counter) {
76 80

  
77
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
81
        HashSet<String> valid_relation = Stream.of(value.raw())
82
                .map(kv -> kv.split())
83
                .filter(s -> sem_rel.equals(new String(s.getFamily())))
84
                .map(s -> asOaf(s.getValue()))
85
                .filter(Objects::nonNull)
86
                .filter(o -> isValid(o))
87
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
88
                .map(o -> o.getRel().getTarget())
89
                .limit(MAX_RELATIONS)
90
                .collect(Collectors.toCollection(HashSet::new));
78 91

  
79 92
        /*
80 93
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
......
83 96
                .collect(Collectors.toCollection(HashSet::new));
84 97
        */
85 98

  
86
        HashSet<String> valid_relation = relationMap.values().stream()
87
                .map(b -> asOaf(b))
88
                .filter(Objects::nonNull)
89
                .filter(o -> isValid(o))
90
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
91
                .map(o -> o.getRel().getTarget())
92
                .collect(Collectors.toCollection(HashSet::new));
93

  
94 99
        context.getCounter(counter, sem_rel).increment(valid_relation.size());
95 100

  
96 101
        return valid_relation;

Also available in: Unified diff