Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import com.google.gson.Gson;
4
import com.googlecode.protobuf.format.JsonFormat;
5
import eu.dnetlib.data.bulktag.CommunityConfiguration;
6
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
8
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
9
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10
import eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult.CommunityToResultFileReducer;
11
import eu.dnetlib.data.proto.ResultProtos;
12
import org.apache.commons.lang.StringUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.io.Text;
18
import org.apache.hadoop.mapreduce.Reducer;
19

    
20
import java.io.IOException;
21
import java.util.HashSet;
22
import java.util.Iterator;
23
import java.util.Set;
24

    
25
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
26
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
27

    
28
public class PropagationCommunityThroughOrganizationFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> {
29

    
30
    private static final Log log = LogFactory.getLog(CommunityToResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
31

    
32
    private Text keyOut;
33
    private Text outValue;
34

    
35

    
36

    
37
    @Override
38
    protected void setup(final Context context) throws IOException, InterruptedException {
39
        super.setup(context);
40
        keyOut = new Text("");
41
        outValue = new Text();
42

    
43
    }
44

    
45

    
46
    @Override
47
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
48
        Iterator<Text> it = values.iterator();
49
        DedupedList communities = new DedupedList();
50
        Set<String> resultIds = new HashSet<>();
51

    
52
        while(it.hasNext()){
53
            Value v = Value.fromJson(it.next().toString());
54
            if(v.getType() == PropagationConstants.Type.fromorganization){
55
                communities.addAll(DedupedList.fromJson(v.getValue()));
56

    
57
            }
58

    
59
            if(v.getType() == PropagationConstants.Type.fromresult){
60
                resultIds.add(v.getValue());
61
            }
62

    
63
        }
64

    
65

    
66
        resultIds.stream().forEach(result -> {
67
            try {
68
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
69
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
70

    
71
                String keyString = Bytes.toString(key.get());
72
                keyOut.set(keyString);
73

    
74
                outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, keyString)).getBytes());
75
                context.write(keyOut, outValue);
76
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
77

    
78

    
79
            } catch (IOException e) {
80
                e.printStackTrace();
81
            } catch (InterruptedException e) {
82
                e.printStackTrace();
83
            }
84
        });
85

    
86

    
87
    }
88
}
(3-3/5)