Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.*;
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.util.Bytes;
7
import org.apache.hadoop.mapreduce.Mapper;
8

    
9
import java.util.HashSet;
10
import java.util.Map;
11
import java.util.Objects;
12
import java.util.Set;
13
import java.util.stream.Collectors;
14
import java.util.stream.Stream;
15

    
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DATA_INFO_TYPE;
18

    
19
public final class Utils {
20

    
21
    public static final int MAX_RELATIONS = 10000;
22

    
23
    public static  OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
24
        final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body"));
25
        if (body != null){
26
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
27
            if(oaf.getDataInfo().getDeletedbyinference())
28
                return null;
29
            return oaf.getEntity();
30
        }
31
        return null;
32
    }
33

    
34
    public static FieldTypeProtos.DataInfo.Builder getDataInfo(String trust, String class_id,String schema_id,String schema_name, String data_info_type,String class_name) {
35
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
36
                .setInferred(true)
37
                .setProvenanceaction(getQualifier(class_id,schema_id,schema_name,class_name)                        )
38
                .setInferenceprovenance(data_info_type)
39
                .setTrust(trust);
40
        return builder;
41

    
42
    }
43

    
44
    public static FieldTypeProtos.Qualifier.Builder getQualifier(String class_id, String schema_id, String schema_name, String class_name){
45
        return FieldTypeProtos.Qualifier.newBuilder()
46
                .setClassid(class_id)
47
                .setClassname(class_name)
48
                .setSchemeid(schema_id)
49
                .setSchemename(schema_name);
50
    }
51

    
52

    
53
    public static FieldTypeProtos.Qualifier.Builder getCountry(String countryValue, String trust,String dnet_schema, String class_id,String schema_id,String schema_name, String data_info_type, String class_name) {
54

    
55
        final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
56
                .setClassid(countryValue)
57
                .setClassname(countryValue)
58
                .setSchemeid(dnet_schema)
59
                .setSchemename(dnet_schema);
60
        country.setDataInfo(getDataInfo(trust,class_id,schema_id,schema_name,data_info_type,class_name));//"Propagation of country information from datasources belonging to institutional repositories"));
61

    
62
        return country;
63
    }
64

    
65
    public static  ResultProtos.Result.Context getContext(String contextid, String trust) {
66
        final ResultProtos.Result.Context.Builder cBuilder = ResultProtos.Result.Context.newBuilder()
67
                .setId(contextid)
68
                .addDataInfo(getDataInfo(trust, CLASS_COMMUNITY_ID, SCHEMA_ID, SCHEMA_NAME, DATA_INFO_TYPE, DATA_INFO_TYPE));
69
        return cBuilder.build();
70
    }
71

    
72
    public static  ResultProtos.Result.Context getContext(String contextid, String trust, String class_id, String dataInfoType, String class_name) {
73
        final ResultProtos.Result.Context.Builder cBuilder = ResultProtos.Result.Context.newBuilder()
74
                .setId(contextid)
75
                .addDataInfo(getDataInfo(trust, class_id, SCHEMA_ID, SCHEMA_NAME, dataInfoType, class_name));
76
        return cBuilder.build();
77
    }
78

    
79
    public static Set<String> getRelationTarget(Result value, String sem_rel, final Mapper.Context context, String counter) {
80

    
81
        HashSet<String> valid_relation = Stream.of(value.raw())
82
                .map(kv -> kv.split())
83
                .filter(s -> sem_rel.equals(new String(s.getFamily())))
84
                .map(s -> asOaf(s.getValue()))
85
                .filter(Objects::nonNull)
86
                .filter(o -> isValid(o))
87
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
88
                .map(o -> o.getRel().getTarget())
89
                .limit(MAX_RELATIONS)
90
                .collect(Collectors.toCollection(HashSet::new));
91

    
92
        /*
93
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
94
        return relationMap.keySet().stream()
95
                .map(String::new)
96
                .collect(Collectors.toCollection(HashSet::new));
97
        */
98

    
99
        context.getCounter(counter, sem_rel).increment(valid_relation.size());
100

    
101
        return valid_relation;
102
    }
103

    
104
    private static OafProtos.Oaf asOaf(byte[] r) {
105
        try {
106
            return OafProtos.Oaf.parseFrom(r);
107
        } catch (InvalidProtocolBufferException e) {
108
            return null;
109
        }
110
    }
111

    
112
    public static OafProtos.Oaf getUpdate(ResultProtos.Result.Metadata.Builder metadata, String resultId) {
113
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
114
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
115
                .setType(TypeProtos.Type.result)
116
                .setId(resultId)
117
                .setResult(result);
118

    
119
        return OafProtos.Oaf.newBuilder()
120
                .setKind(KindProtos.Kind.entity)
121
                .setEntity(entity)
122
                .build();
123
    }
124

    
125
    private static boolean isValid(final OafProtos.Oaf oaf) {
126
        return (oaf != null) && oaf.isInitialized();
127
    }
128

    
129
}
(4-4/5)