Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation;
2

    
3
import com.google.common.base.Splitter;
4
import com.google.common.collect.Sets;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.*;
7
import org.apache.commons.collections.MapUtils;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12

    
13
import java.io.IOException;
14
import java.util.Map;
15
import java.util.Set;
16
import java.util.stream.Collectors;
17

    
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19

    
20
/**
21
 * Created by miriam on 17/08/2018.
22
 */
23
public class PropagationCountryFromDsOrgResultMapper extends TableMapper<Key, ImmutableBytesWritable> {
24

    
25

    
26
    private ImmutableBytesWritable valueOut;
27

    
28
    private Set<String> datasourceTypes = Sets.newHashSet("pubsrepository::institutional");
29

    
30
    @Override
31
    protected void setup(final Context context) throws IOException, InterruptedException {
32
        super.setup(context);
33

    
34
        valueOut = new ImmutableBytesWritable();
35

    
36
        datasourceTypes.addAll(Splitter.on(",").omitEmptyStrings().splitToList(context.getConfiguration().get("datasource.types", "")));
37
    }
38

    
39
    @Override
40
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
41

    
42
        final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
43
        final byte[] body = resultMap.get(Bytes.toBytes("body"));
44
        if (body != null){
45
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
46
            OafProtos.OafEntity entity = oaf.getEntity();
47
            switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) {
48
                case datasource:
49
                    DatasourceProtos.Datasource datasource = entity.getDatasource();
50
                    if (datasource == null){
51
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
52
                    }
53
                    final Key datasource1 = Key.datasource(entity.getId());
54
                    String dsType = datasource.getMetadata().getDatasourcetype().getClassid();
55
                    if(datasourceTypes.contains(dsType)) {
56
                        context.getCounter(COUNTER_PROPAGATION, dsType).increment(1);
57
                        valueOut.set(ONE.getBytes());
58
                        context.write(datasource1,valueOut);
59
                    }else{
60
                        context.getCounter(COUNTER_PROPAGATION, String.format("not %s", dsType)).increment(1);
61
                        valueOut.set(ZERO.getBytes());
62
                        context.write(datasource1,valueOut);
63
                    }
64
                    break;
65
                case organization:
66
                    OrganizationProtos.Organization organization = entity.getOrganization();
67
                    if (organization == null){
68
                        throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto");
69
                    }
70

    
71
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
72
                    if (country == null){
73
                        context.getCounter(COUNTER_PROPAGATION,"country elem does not exists").increment(1);
74
                    }else{
75
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
76
                        if (MapUtils.isNotEmpty(ds_org)) {
77

    
78
                            for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
79
                                valueOut.set(country.getClassid().getBytes());
80
                                context.write(Key.organization(dsId), valueOut);
81
                                context.getCounter(COUNTER_PROPAGATION,"country ").increment(1);
82
                            }
83
                        }
84
                    }
85
                    break;
86
                case result:
87
                    ResultProtos.Result result = entity.getResult();
88

    
89
                    for(ResultProtos.Result.Instance instance : result.getInstanceList()) {
90
                        //todo add check if key is not empty and field is not null
91

    
92
                        String hostedBy = instance.getHostedby().getKey();
93
                        valueOut.set(entity.getId().getBytes());
94
                        context.write(Key.publication(hostedBy), valueOut);
95

    
96
                        String collectedFrom = instance.getCollectedfrom().getKey();
97
                        if (!hostedBy.equals(collectedFrom)) {
98
                            context.write(Key.publication(collectedFrom),valueOut);
99
                        }
100
                    }
101
                    break;
102
            }
103
        }
104

    
105

    
106
    }
107
}
(5-5/6)