Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

    
3
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
4
import eu.dnetlib.data.proto.*;
5
import org.apache.commons.collections.MapUtils;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10

    
11
import java.io.IOException;
12
import java.util.Map;
13
import java.util.stream.Collectors;
14

    
15
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
16

    
17
/**
18
 * Created by miriam on 17/08/2018.
19
 */
20
public class PropagationCountryInstitutionalOrganizationMapper extends TableMapper<Key, ImmutableBytesWritable> {
21

    
22

    
23
    private ImmutableBytesWritable valueOut;
24

    
25

    
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29

    
30
        valueOut = new ImmutableBytesWritable();
31
    }
32

    
33
    @Override
34
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
35

    
36
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
37
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
38
        if (body != null){
39
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
40
            OafProtos.OafEntity entity = oaf.getEntity();
41
            switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) {
42
                case datasource:
43
                    DatasourceProtos.Datasource datasource = entity.getDatasource();
44
                    if (datasource == null){
45
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
46
                    }
47
                    final Key datasource1 = Key.datasource(entity.getId());
48
                    if(datasource.getMetadata().getDatasourcetype().getClassid().equals("pubsrepository::institutional")){
49
                        context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
50
                        valueOut.set(ONE.getBytes());
51
                        context.write(datasource1,valueOut);
52
                    }else{
53
                        context.getCounter(COUNTER_PROPAGATION," not institutional datasource").increment(1);
54
                        valueOut.set(ZERO.getBytes());
55
                        context.write(datasource1,valueOut);
56
                    }
57
                    break;
58
                case organization:
59
                    OrganizationProtos.Organization organization = entity.getOrganization();
60
                    if (organization == null){
61
                        throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto");
62
                    }
63

    
64
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
65
                    if (country == null){
66
                        context.getCounter(COUNTER_PROPAGATION,"country elem does not exists").increment(1);
67
                    }else{
68
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
69
                        if (MapUtils.isNotEmpty(ds_org)) {
70

    
71
                            for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
72
                                valueOut.set(country.getClassid().getBytes());
73
                                context.write(Key.organization(dsId), valueOut);
74
                                context.getCounter(COUNTER_PROPAGATION,"country ").increment(1);
75
                            }
76
                        }
77
                    }
78
                    break;
79
                case result:
80
                    ResultProtos.Result result = entity.getResult();
81

    
82
                    for(ResultProtos.Result.Instance instance : result.getInstanceList()) {
83
                        //todo add check if key is not empty and field is not null
84

    
85
                        String hostedBy = instance.getHostedby().getKey();
86
                        valueOut.set(entity.getId().getBytes());
87
                        context.write(Key.publication(hostedBy), valueOut);
88

    
89
                        String collectedFrom = instance.getCollectedfrom().getKey();
90
                        if (!hostedBy.equals(collectedFrom)) {
91
                            context.write(Key.publication(collectedFrom),valueOut);
92
                        }
93
                    }
94
                    break;
95
            }
96
        }
97

    
98

    
99
    }
100
}
(5-5/6)