Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.proto.*;
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.util.Bytes;
7
import org.apache.hadoop.mapreduce.Mapper;
8

    
9
import java.util.HashSet;
10
import java.util.Map;
11
import java.util.Objects;
12
import java.util.Set;
13
import java.util.stream.Collectors;
14

    
15
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
16
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.DATA_INFO_TYPE;
17

    
18
public final class Utils {
19
    public static  OafProtos.OafEntity getEntity(Result value, TypeProtos.Type type) throws InvalidProtocolBufferException {
20
        final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body"));
21
        if (body != null){
22
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
23
            if(oaf.getDataInfo().getDeletedbyinference())
24
                return null;
25
            return oaf.getEntity();
26
        }
27
        return null;
28
    }
29

    
30
    public static FieldTypeProtos.DataInfo.Builder getDataInfo(String trust, String class_id,String schema_id,String schema_name, String data_info_type,String class_name) {
31
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
32
                .setInferred(true)
33
                .setProvenanceaction(getQualifier(class_id,schema_id,schema_name,class_name)                        )
34
                .setInferenceprovenance(data_info_type)
35
                .setTrust(trust);
36
        return builder;
37

    
38
    }
39

    
40
    public static FieldTypeProtos.Qualifier.Builder getQualifier(String class_id, String schema_id, String schema_name, String class_name){
41
        return FieldTypeProtos.Qualifier.newBuilder()
42
                .setClassid(class_id)
43
                .setClassname(class_name)
44
                .setSchemeid(schema_id)
45
                .setSchemename(schema_name);
46
    }
47

    
48

    
49
    public static FieldTypeProtos.Qualifier.Builder getCountry(String countryValue, String trust,String dnet_schema, String class_id,String schema_id,String schema_name, String data_info_type, String class_name) {
50

    
51
        final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
52
                .setClassid(countryValue)
53
                .setClassname(countryValue)
54
                .setSchemeid(dnet_schema)
55
                .setSchemename(dnet_schema);
56
        country.setDataInfo(getDataInfo(trust,class_id,schema_id,schema_name,data_info_type,class_name));//"Propagation of country information from datasources belonging to institutional repositories"));
57

    
58
        return country;
59
    }
60

    
61
    public static  ResultProtos.Result.Context getContext(String contextid, String trust) {
62
        final ResultProtos.Result.Context.Builder cBuilder = ResultProtos.Result.Context.newBuilder()
63
                .setId(contextid)
64
                .addDataInfo(getDataInfo(trust, CLASS_COMMUNITY_ID, SCHEMA_ID, SCHEMA_NAME, DATA_INFO_TYPE, DATA_INFO_TYPE));
65
        return cBuilder.build();
66
    }
67

    
68
    public static  ResultProtos.Result.Context getContext(String contextid, String trust, String class_id, String dataInfoType, String class_name) {
69
        final ResultProtos.Result.Context.Builder cBuilder = ResultProtos.Result.Context.newBuilder()
70
                .setId(contextid)
71
                .addDataInfo(getDataInfo(trust, class_id, SCHEMA_ID, SCHEMA_NAME, dataInfoType, class_name));
72
        return cBuilder.build();
73
    }
74

    
75
    public static Set<String> getRelationTarget(Result value, String sem_rel, final Mapper.Context context, String counter) {
76

    
77
        final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel));
78

    
79
        /*
80
        we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference
81
        return relationMap.keySet().stream()
82
                .map(String::new)
83
                .collect(Collectors.toCollection(HashSet::new));
84
        */
85

    
86
        HashSet<String> valid_relation = relationMap.values().stream()
87
                .map(b -> asOaf(b))
88
                .filter(Objects::nonNull)
89
                .filter(o -> isValid(o))
90
                .filter(o -> !o.getDataInfo().getDeletedbyinference())
91
                .map(o -> o.getRel().getTarget())
92
                .collect(Collectors.toCollection(HashSet::new));
93

    
94
        context.getCounter(counter, sem_rel).increment(valid_relation.size());
95

    
96
        return valid_relation;
97
    }
98

    
99
    private static OafProtos.Oaf asOaf(byte[] r) {
100
        try {
101
            return OafProtos.Oaf.parseFrom(r);
102
        } catch (InvalidProtocolBufferException e) {
103
            return null;
104
        }
105
    }
106

    
107
    public static OafProtos.Oaf getUpdate(ResultProtos.Result.Metadata.Builder metadata, String resultId) {
108
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
109
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
110
                .setType(TypeProtos.Type.result)
111
                .setId(resultId)
112
                .setResult(result);
113

    
114
        return OafProtos.Oaf.newBuilder()
115
                .setKind(KindProtos.Kind.entity)
116
                .setEntity(entity)
117
                .build();
118
    }
119

    
120
    private static boolean isValid(final OafProtos.Oaf oaf) {
121
        return (oaf != null) && oaf.isInitialized();
122
    }
123

    
124
}
(4-4/5)