Project

General

Profile

« Previous | Next » 

Revision 57211

final logic of propagation of community through organization (products belonging to given organization will be associated to the community)

View differences:

PropagationCommunityThroughOrganizationMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2 2

  
3 3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.Organization;
5 4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6 5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.*;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.TypeProtos;
8
import org.apache.commons.lang3.StringUtils;
8 9
import org.apache.hadoop.hbase.client.Result;
9 10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10 11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
......
12 13
import org.apache.hadoop.io.Text;
13 14

  
14 15
import java.io.IOException;
15
import java.util.HashSet;
16 16
import java.util.Set;
17 17

  
18 18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
......
23 23
    private Text valueOut;
24 24
    private ImmutableBytesWritable keyOut;
25 25
    private OrganizationMap organizationMap;
26

  
26 27
    //seleziono il tipo della entry:
27 28
    //Result:
28 29
    //se non e' deleted by inference ed ha organizzazioni a cui e' associato,
......
36 37
        valueOut = new Text();
37 38
        keyOut = new ImmutableBytesWritable();
38 39
        organizationMap = new Gson().fromJson(context.getConfiguration().get("organization.community.map"), OrganizationMap.class);
40
        System.out.println("got organization map: " + new Gson().toJson(organizationMap)) ;
39 41
    }
40 42

  
41 43
    @Override
......
45 47
        if (entity != null) {
46 48
            switch (type) {
47 49
                case organization:
48
                    valueOut.set(Value.newInstance(
49
                            new Gson().toJson(
50
                                    getCommunityList(Bytes.toString(keyIn.get()),
51
                                            getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION)), //search for organization it merges
52
                                    DedupedList.class),
53
                            ORGANIZATION_COMMUNITY_TRUST,
54
                            Type.fromorganization).toJson());
55
                    context.write(keyIn, valueOut);
56
                    context.getCounter(COUNTER_PROPAGATION, "emit for organization ");
50
                    DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()),
51
                            getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context);
52
                    if (communityList.size() > 0){
53
                        valueOut.set(Value.newInstance(
54
                                new Gson().toJson(
55
                                        communityList, //search for organization it merges
56
                                        DedupedList.class),
57
                                ORGANIZATION_COMMUNITY_TRUST,
58
                                Type.fromorganization).toJson());
59
                        context.write(keyIn, valueOut);
60
                        context.getCounter(COUNTER_PROPAGATION, "emit for organization ").increment(1);
61
                    }else{
62
                        context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1);
63
                    }
64

  
57 65
                    break;
58 66
                case result:
59
                    Set<String> result_organization = getRelationTarget(value, REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION);
60
                    result_organization.stream().forEach(org -> {
61
                        try {
62
                            emit(org, Bytes.toString(keyIn.get()), context);
63
                        } catch (IOException e) {
64
                            e.printStackTrace();
65
                        } catch (InterruptedException e) {
66
                            e.printStackTrace();
67
                        }
68
                    });
69

  
67
                    Set<String> result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION);
68
                    for(String org: result_organization)
69
                        emit(org, Bytes.toString(keyIn.get()), context);
70 70
                    break;
71 71
            }
72 72
        }
73 73
    }
74 74

  
75
    private DedupedList getCommunityList(String organizationId, Set<String> relationTarget) {
75
    private DedupedList getCommunityList(String organizationId, Set<String> relationTarget, Context context) {
76 76
        DedupedList communityList = new DedupedList();
77
        relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(org)));
78
        communityList.addAll(organizationMap.get(organizationId));
77
        relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|"))));
78
        communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|")));
79
        communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organization for " + c).increment(1));
79 80
        return communityList;
80 81
    }
81 82

  
......
83 84
        keyOut.set(Bytes.toBytes(org));
84 85
        valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson());
85 86
        context.write(keyOut,valueOut);
86
        context.getCounter(COUNTER_PROPAGATION, "emit for result");
87
        context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
87 88
    }
88 89

  
89 90
}

Also available in: Unified diff