Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14

    
15
import java.io.IOException;
16
import java.util.Set;
17

    
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
21

    
22
public class PropagationCommunityThroughOrganizationMapper extends TableMapper<ImmutableBytesWritable, Text> {
23
    private Text valueOut;
24
    private ImmutableBytesWritable keyOut;
25
    private OrganizationMap organizationMap;
26

    
27
    //seleziono il tipo della entry:
28
    //Result:
29
    //se non e' deleted by inference ed ha organizzazioni a cui e' associato,
30
    // //emetto id della relazione ed id del risultato per ogni organizzazione a cui e' associato
31
    //ORGANIZATION:
32
    //se non e' deleted by inference e non e' deduplicata emetto l'id della organizzazione
33
    //se non e' deleted by inference ed e' deduplicata, emetto id della organizzazione ed id del deduplicato e lista delle organizzazioni mergiate
34
    @Override
35
    protected void setup(final Context context) throws IOException, InterruptedException {
36
        super.setup(context);
37
        valueOut = new Text();
38
        keyOut = new ImmutableBytesWritable();
39
        organizationMap = new Gson().fromJson(context.getConfiguration().get("organization.community.map"), OrganizationMap.class);
40
        System.out.println("got organization map: " + new Gson().toJson(organizationMap)) ;
41
    }
42

    
43
    @Override
44
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
45
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
46
        final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference
47
        if (entity != null) {
48
            switch (type) {
49
                case organization:
50
                    DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()),
51
                            getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context);
52
                    if (communityList.size() > 0){
53
                        valueOut.set(Value.newInstance(
54
                                new Gson().toJson(
55
                                        communityList, //search for organization it merges
56
                                        DedupedList.class),
57
                                ORGANIZATION_COMMUNITY_TRUST,
58
                                Type.fromorganization).toJson());
59
                        context.write(keyIn, valueOut);
60
                        context.getCounter(COUNTER_PROPAGATION, "emit for organization ").increment(1);
61
                    }else{
62
                        context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1);
63
                    }
64

    
65
                    break;
66
                case result:
67
                    Set<String> result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION);
68
                    for(String org: result_organization)
69
                        emit(org, Bytes.toString(keyIn.get()), context);
70
                    break;
71
            }
72
        }
73
    }
74

    
75
    private DedupedList getCommunityList(String organizationId, Set<String> relationTarget, Context context) {
76
        DedupedList communityList = new DedupedList();
77
        relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|"))));
78
        communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|")));
79
        communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organization for " + c).increment(1));
80
        return communityList;
81
    }
82

    
83
    private void emit(String org, String resId, Context context) throws IOException, InterruptedException {
84
        keyOut.set(Bytes.toBytes(org));
85
        valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson());
86
        context.write(keyOut,valueOut);
87
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
88
    }
89

    
90
}
(4-4/5)