Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.Organization;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.*;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13

    
14
import java.io.IOException;
15
import java.util.HashSet;
16
import java.util.Set;
17

    
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getEntity;
20
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getRelationTarget;
21

    
22
public class PropagationCommunityThroughOrganizationMapper extends TableMapper<ImmutableBytesWritable, Text> {
23
    private Text valueOut;
24
    private ImmutableBytesWritable keyOut;
25
    private OrganizationMap organizationMap;
26
    //seleziono il tipo della entry:
27
    //Result:
28
    //se non e' deleted by inference ed ha organizzazioni a cui e' associato,
29
    // //emetto id della relazione ed id del risultato per ogni organizzazione a cui e' associato
30
    //ORGANIZATION:
31
    //se non e' deleted by inference e non e' deduplicata emetto l'id della organizzazione
32
    //se non e' deleted by inference ed e' deduplicata, emetto id della organizzazione ed id del deduplicato e lista delle organizzazioni mergiate
33
    @Override
34
    protected void setup(final Context context) throws IOException, InterruptedException {
35
        super.setup(context);
36
        valueOut = new Text();
37
        keyOut = new ImmutableBytesWritable();
38
        organizationMap = new Gson().fromJson(context.getConfiguration().get("organization.community.map"), OrganizationMap.class);
39
    }
40

    
41
    @Override
42
    protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
43
        final TypeProtos.Type type = OafRowKeyDecoder.decode(keyIn.copyBytes()).getType();
44
        final OafProtos.OafEntity entity = getEntity(value, type);//getEntity already verified that it is not delByInference
45
        if (entity != null) {
46
            switch (type) {
47
                case organization:
48
                    valueOut.set(Value.newInstance(
49
                            new Gson().toJson(
50
                                    getCommunityList(Bytes.toString(keyIn.get()),
51
                                            getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION)), //search for organization it merges
52
                                    DedupedList.class),
53
                            ORGANIZATION_COMMUNITY_TRUST,
54
                            Type.fromorganization).toJson());
55
                    context.write(keyIn, valueOut);
56
                    context.getCounter(COUNTER_PROPAGATION, "emit for organization ");
57
                    break;
58
                case result:
59
                    Set<String> result_organization = getRelationTarget(value, REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION);
60
                    result_organization.stream().forEach(org -> {
61
                        try {
62
                            emit(org, Bytes.toString(keyIn.get()), context);
63
                        } catch (IOException e) {
64
                            e.printStackTrace();
65
                        } catch (InterruptedException e) {
66
                            e.printStackTrace();
67
                        }
68
                    });
69

    
70
                    break;
71
            }
72
        }
73
    }
74

    
75
    private DedupedList getCommunityList(String organizationId, Set<String> relationTarget) {
76
        DedupedList communityList = new DedupedList();
77
        relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(org)));
78
        communityList.addAll(organizationMap.get(organizationId));
79
        return communityList;
80
    }
81

    
82
    private void emit(String org, String resId, Context context) throws IOException, InterruptedException {
83
        keyOut.set(Bytes.toBytes(org));
84
        valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson());
85
        context.write(keyOut,valueOut);
86
        context.getCounter(COUNTER_PROPAGATION, "emit for result");
87
    }
88

    
89
}
(4-4/5)