Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

    
3
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
4
import eu.dnetlib.data.proto.*;
5
import org.apache.commons.collections.MapUtils;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10

    
11
import java.io.IOException;
12
import java.util.Map;
13
import java.util.stream.Collectors;
14

    
15
/**
16
 * Created by miriam on 17/08/2018.
17
 */
18
public class PropagationCountryInstitutionalOrganizationMapper extends TableMapper<Key, ImmutableBytesWritable> {
19

    
20

    
21
    private ImmutableBytesWritable valueOut;
22

    
23

    
24
    @Override
25
    protected void setup(final Context context) throws IOException, InterruptedException {
26
        super.setup(context);
27

    
28
        valueOut = new ImmutableBytesWritable();
29

    
30
    }
31

    
32
    @Override
33
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
34

    
35
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
36
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
37
        if (body != null){
38
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
39
            OafProtos.OafEntity entity = oaf.getEntity();
40
            switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) {
41
                case datasource:
42
                    DatasourceProtos.Datasource datasource = entity.getDatasource();
43
                    if (datasource == null){
44
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
45
                    }
46
                    final Key datasource1 = Key.datasource(entity.getId());
47
                    if(datasource.getMetadata().getDatasourcetype().getClassid().equals("pubsrepository::institutional")){
48
                        context.getCounter("Propagation","institutional datasource").increment(1);
49
                        valueOut.set("1".getBytes());
50
                        context.write(datasource1,valueOut);
51
                    }else{
52
                        context.getCounter("Propagation"," not institutional datasource").increment(1);
53
                        valueOut.set("0".getBytes());
54
                        context.write(datasource1,valueOut);
55
                    }
56
                    break;
57
                case organization:
58
                    OrganizationProtos.Organization organization = entity.getOrganization();
59
                    if (organization == null){
60
                        throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto");
61
                    }
62

    
63
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
64
                    if (country == null){
65
                        context.getCounter("Propagation","country elem does not exists").increment(1);
66
                    }else{
67
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
68
                        if (MapUtils.isNotEmpty(ds_org)) {
69

    
70
                            for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
71
                                valueOut.set(country.getClassid().getBytes());
72
                                context.write(Key.organization(dsId), valueOut);
73
                                context.getCounter("Propagation","country ").increment(1);
74
                            }
75
                        }
76
                    }
77
                    break;
78
                case result:
79
                    ResultProtos.Result result = entity.getResult();
80

    
81
                    for(ResultProtos.Result.Instance instance : result.getInstanceList()) {
82
                        //todo add check if key is not empty and field is not null
83

    
84
                        String hostedBy = instance.getHostedby().getKey();
85
                        valueOut.set(entity.getId().getBytes());
86
                        context.write(Key.publication(hostedBy), valueOut);
87

    
88
                        String collectedFrom = instance.getCollectedfrom().getKey();
89
                        if (!hostedBy.equals(collectedFrom)) {
90
                            context.write(Key.publication(collectedFrom),valueOut);
91
                        }
92
                    }
93
                    break;
94
            }
95
        }
96

    
97

    
98

    
99

    
100
//        if (body != null) {
101
//            context.getCounter("Bulk Tagging", "not null body ").increment(1);
102

    
103
//            final OafProtos.Oaf oaf = tagger.enrichContext(, cc, context);
104
//            if (oaf == null) {
105
//                //context.getCounter("In mapper", " null oaf ").increment(1);
106
//                return;
107
//            }
108
//
109
//            long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream()
110
//                    .flatMap(c -> c.getDataInfoList().stream())
111
//                    .map(FieldTypeProtos.DataInfo::getInferenceprovenance)
112
//                    .filter(infProv -> "bulktagging::community".equals(infProv))
113
//                    .count();
114
//            context.getCounter("Bulk Tagging", " bulktagged ").increment(tagged);
115
//
116
//
117
//            final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("update"), Bytes.toBytes("update_"+System.nanoTime()), oaf.toByteArray());
118
//
119
//            if(tagged > 0){
120
//                context.write(key, put);
121
//                context.getCounter("Bulk Tagging", " write op ").increment(1);
122
//            }
123
//
124
//        }
125
//        else{
126
//            context.getCounter("Bulk Tagging", " null body ").increment(1);
127
//        }
128

    
129
    }
130
}
(4-4/5)