Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
5
import eu.dnetlib.data.proto.ResultProtos;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13

    
14
import java.io.IOException;
15
import java.util.HashSet;
16
import java.util.Iterator;
17
import java.util.Set;
18

    
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
20

    
21
public class PropagationCommunityThroughOrganizationReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> {
22
    private static final Log log = LogFactory.getLog(PropagationCommunityThroughOrganizationReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
23
    private ImmutableBytesWritable keyOut;
24

    
25

    
26
    @Override
27
    protected void setup(final Context context) throws IOException, InterruptedException {
28
        super.setup(context);
29
        keyOut = new ImmutableBytesWritable();
30

    
31
    }
32

    
33

    
34
    @Override
35
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
36
        Iterator<Text> it = values.iterator();
37
        DedupedList communities = new DedupedList();
38
        Set<String> resultIds = new HashSet<>();
39

    
40
        while (it.hasNext()) {
41
            Value v = Value.fromJson(it.next().toString());
42
            switch (v.getType()) {
43
                case fromorganization:
44
                    communities.addAll(DedupedList.fromJson(v.getValue()));
45
                    break;
46
                case fromresult:
47
                    resultIds.add(v.getValue());
48
                    break;
49
            }
50

    
51

    
52
        }
53

    
54
        if (communities.size() > 0) {
55
            final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
56
            communities.stream().forEach(community -> {
57
                metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME));
58
                context.getCounter(COUNTER_PROPAGATION, "added result to community " + community).increment(resultIds.size());
59
            });
60
            for (String result : resultIds) {
61
                final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray());
62
                keyOut.set(Bytes.toBytes(result));
63
                context.write(keyOut, put);
64
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
65
            }
66

    
67
        }
68

    
69
    }
70

    
71
}
(5-5/5)