Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

    
3
import com.google.common.base.Splitter;
4
import com.google.common.collect.Lists;
5
import com.google.common.collect.Sets;
6
import com.google.protobuf.InvalidProtocolBufferException;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.compositekeys.InstOrgKey;
9
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
10
import eu.dnetlib.data.proto.*;
11
import org.apache.commons.collections.MapUtils;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Text;
17

    
18
import java.io.IOException;
19
import java.util.ArrayList;
20
import java.util.Map;
21
import java.util.Set;
22
import java.util.stream.Collectors;
23

    
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*;
26
/**
27
 * Created by miriam on 17/08/2018.
28
 */
29
public class PropagationCountryFromDsOrgResultMapper extends TableMapper<InstOrgKey, Text> {
30

    
31
    private Text valueOut;
32

    
33
    private Set<String> datasourceTypes = Sets.newHashSet("pubsrepository::institutional");
34
    private Set<String> whiteList = Sets.newHashSet("10|opendoar____::300891a62162b960cf02ce3827bb363c");
35
    private Set<String> blackList = Sets.newHashSet("");
36

    
37
    @Override
38
    protected void setup(final Context context) throws IOException, InterruptedException {
39
        super.setup(context);
40

    
41
        valueOut = new Text();
42

    
43
        datasourceTypes.addAll(getParam(context, "datasource.types"));
44
        whiteList.addAll(getParam(context, "datasource.whitelist"));
45
    }
46

    
47
    @Override
48
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49

    
50
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
51
        final OafProtos.OafEntity entity = getEntity(value,type);
52
        if (entity != null) {
53
            switch (type) {
54
                case datasource:
55
                    final DatasourceProtos.Datasource datasource = entity.getDatasource();
56
                    final String id = entity.getId();
57
                    if (datasource == null) {
58
                        throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto");
59
                    }
60

    
61
                    String dsType = datasource.getMetadata().getDatasourcetype().getClassid();
62
                    if (datasourceTypes.contains(dsType)) {
63
                        // verify datasource is in blacklist
64
                        if (blackList.contains(id)){
65
                            context.getCounter(COUNTER_PROPAGATION,"blacklisted").increment(1);
66
                            emitNotAllowedDatasource(context,entity.getId());
67

    
68
                        } else {
69
                            emitAllowedDatasource(value, context, entity.getId(), dsType);
70
                        }
71
                    } else {
72
                        // verify datasource is in whiteList
73

    
74
                        if (whiteList.contains(id)) {
75
                            context.getCounter(COUNTER_PROPAGATION,"whitelisted " + id).increment(1);
76
                            emitAllowedDatasource(value,context,entity.getId(),dsType);
77
                        } else {
78
                            emitNotAllowedDatasource(context, entity.getId());
79
                        }
80
                    }
81

    
82
                    break;
83
                case organization:
84
                    OrganizationProtos.Organization organization = entity.getOrganization();
85
                    if (organization == null) {
86
                        throw new RuntimeException("oaf type is organizationtoresult, but organizationtoresult proto is not found in oafproto");
87
                    }
88

    
89
                    FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry();
90
                    if (country == null) {
91
                        context.getCounter(COUNTER_PROPAGATION, "country elem does not exists").increment(1);
92
                    } else {
93
                        final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes());
94
                        if (MapUtils.isNotEmpty(ds_org)) {
95

    
96
                            for (String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) {
97

    
98
                                valueOut.set(Value.newInstance(country.getClassid()).toJson());
99
                                context.write( InstOrgKey.organization(dsId), valueOut);
100
                                context.getCounter(COUNTER_PROPAGATION, "country ").increment(1);
101
                            }
102
                        }
103
                    }
104

    
105
                    break;
106
                case result:
107
                    ResultProtos.Result result = entity.getResult();
108

    
109
                    for (ResultProtos.Result.Instance instance : result.getInstanceList()) {
110
                        //todo add check if key is not empty and field is not null
111

    
112
                        String hostedBy = instance.getHostedby().getKey();
113
                        valueOut.set(Value.newInstance(entity.getId()).toJson());
114
                        context.write( InstOrgKey.publication(hostedBy),valueOut);
115
                        context.getCounter(COUNTER_PROPAGATION, "emit hostedby | collectedfrom for publication ").increment(1);
116
                        String collectedFrom = instance.getCollectedfrom().getKey();
117
                        if (!hostedBy.equals(collectedFrom)) {
118
                            context.write(eu.dnetlib.data.mapreduce.hbase.propagation.compositekeys.InstOrgKey.publication(collectedFrom), valueOut);
119
                            context.getCounter(COUNTER_PROPAGATION, "emit hostedby | collectedfrom for publication ").increment(1);
120
                        }
121
                    }
122
                    break;
123
            }
124
        }
125
    }
126

    
127
    private void emitNotAllowedDatasource(Context context, String id) throws IOException, InterruptedException {
128
        valueOut.set(Value.newInstance(ZERO).toJson());
129
        context.write( InstOrgKey.datasource(id), valueOut);
130
        context.getCounter(COUNTER_PROPAGATION, "ds Type not in propagation allowed list").increment(1);
131
    }
132

    
133
    private void emitAllowedDatasource(Result value, Context context, String id, String dsType) throws IOException, InterruptedException {
134
        valueOut.set(Value.newInstance(ONE, getTrust(value)).toJson());
135
        context.write( InstOrgKey.datasource(id), valueOut);
136
        context.getCounter(COUNTER_PROPAGATION, String.format("%s in propagation allowed list", dsType)).increment(1);
137
    }
138

    
139

    
140

    
141
    private String getTrust(Result value) throws InvalidProtocolBufferException {
142
        final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource"));
143
        final byte[] body = map.get(Bytes.toBytes("body"));
144
        if (body != null){
145
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body);
146
            return oaf.getDataInfo().getTrust();
147
        }
148
        return null;
149
    }
150

    
151
    private ArrayList<String> getParam(Context context, String s) {
152
        return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get(s, "")));
153
    }
154

    
155
}
(2-2/4)