Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.CommunityConfiguration;
5
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.client.Put;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableReducer;
16
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.io.Text;
18

    
19
import java.io.IOException;
20
import java.util.HashSet;
21
import java.util.Iterator;
22
import java.util.Set;
23

    
24
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
25

    
26
public class PropagationCommunityThroughOrganizationReducer extends TableReducer<ImmutableBytesWritable, Text, ImmutableBytesWritable> {
27
    private static final Log log = LogFactory.getLog(PropagationCommunityThroughOrganizationReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
28
    private ImmutableBytesWritable keyOut;
29

    
30
    private CommunityConfiguration cc;
31

    
32
    @Override
33
    protected void setup(final Context context) throws IOException, InterruptedException {
34
        super.setup(context);
35
        keyOut = new ImmutableBytesWritable();
36

    
37
        final String conf = context.getConfiguration().get("tagging.conf");
38

    
39
        if (StringUtils.isBlank(conf)) {
40
            throw new IllegalArgumentException("missing organization community map configuration");
41
        }
42
        System.out.println("conf = " + conf);
43
        cc = CommunityConfigurationFactory.fromJson(conf);
44
    }
45

    
46

    
47
    @Override
48
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
49
        Iterator<Text> it = values.iterator();
50
        Set<String> communities = new HashSet<>();
51
        Set<String> resultIds = new HashSet<>();
52

    
53
        while(it.hasNext()){
54
            Value v = Value.fromJson(it.next().toString());
55
            if(v.getType() == PropagationConstants.Type.fromorganization){
56
                DedupedList dedup_list = new Gson().fromJson(v.getValue(),DedupedList.class);
57
                dedup_list.stream().forEach(org -> communities.addAll(cc.getCommunityForOrganizationValue(org)));
58
                communities.addAll(cc.getCommunityForOrganizationValue(Bytes.toString(key.get())));
59
            }
60
            if(v.getType() == PropagationConstants.Type.fromresult){
61
                resultIds.add(v.getValue());
62
            }
63

    
64
        }
65

    
66

    
67
        resultIds.stream().forEach(result -> {
68
            try {
69
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
70
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME)));
71
                final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray());
72
                keyOut.set(Bytes.toBytes(result));
73
                context.write(keyOut, put);
74
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
75
            } catch (IOException e) {
76
                e.printStackTrace();
77
            } catch (InterruptedException e) {
78
                e.printStackTrace();
79
            }
80
        });
81

    
82

    
83

    
84

    
85

    
86

    
87
    }
88

    
89

    
90
}
(4-4/4)