Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.CommunityConfiguration;
5
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.client.Put;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableReducer;
16
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.hbase.util.Hash;
18
import org.apache.hadoop.io.Text;
19

    
20
import java.io.IOException;
21
import java.util.HashSet;
22
import java.util.Iterator;
23
import java.util.Set;
24

    
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
26

    
27
public class PropagationCommunityThroughOrganizationReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> {
28
    private static final Log log = LogFactory.getLog(PropagationCommunityThroughOrganizationReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
29
    private ImmutableBytesWritable keyOut;
30

    
31

    
32
    @Override
33
    protected void setup(final Context context) throws IOException, InterruptedException {
34
        super.setup(context);
35
        keyOut = new ImmutableBytesWritable();
36

    
37
    }
38

    
39

    
40
    @Override
41
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
42
        Iterator<Text> it = values.iterator();
43
        DedupedList communities = new DedupedList();
44
        Set<String> resultIds = new HashSet<>();
45

    
46
        while(it.hasNext()){
47
            Value v = Value.fromJson(it.next().toString());
48
            if(v.getType() == PropagationConstants.Type.fromorganization){
49
                communities.addAll(DedupedList.fromJson(v.getValue()));
50

    
51
            }
52

    
53
            if(v.getType() == PropagationConstants.Type.fromresult){
54
                resultIds.add(v.getValue());
55
            }
56

    
57
        }
58

    
59

    
60
        resultIds.stream().forEach(result -> {
61
            try {
62
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
63
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME)));
64
                final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray());
65
                keyOut.set(Bytes.toBytes(result));
66
                context.write(keyOut, put);
67
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
68
            } catch (IOException e) {
69
                e.printStackTrace();
70
            } catch (InterruptedException e) {
71
                e.printStackTrace();
72
            }
73
        });
74

    
75

    
76

    
77

    
78

    
79

    
80
    }
81

    
82

    
83
}
(5-5/5)