Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2

    
3
import com.googlecode.protobuf.format.JsonFormat;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult.CommunityToResultFileReducer;
7
import eu.dnetlib.data.proto.ResultProtos;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13

    
14
import java.io.IOException;
15
import java.util.HashSet;
16
import java.util.Iterator;
17
import java.util.Set;
18

    
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
20

    
21
public class PropagationCommunityThroughOrganizationFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> {
22

    
23
    private static final Log log = LogFactory.getLog(CommunityToResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
24

    
25
    private Text keyOut;
26
    private Text outValue;
27

    
28

    
29

    
30
    @Override
31
    protected void setup(final Context context) throws IOException, InterruptedException {
32
        super.setup(context);
33
        keyOut = new Text("");
34
        outValue = new Text();
35

    
36
    }
37

    
38

    
39
    @Override
40
    protected void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
41
        Iterator<Text> it = values.iterator();
42
        DedupedList communities = new DedupedList();
43
        Set<String> resultIds = new HashSet<>();
44

    
45
        while(it.hasNext()){
46
            Value v = Value.fromJson(it.next().toString());
47
            switch (v.getType()){
48
                case fromorganization:
49
                    communities.addAll(DedupedList.fromJson(v.getValue()));
50
                    break;
51
                case fromresult:
52
                    resultIds.add(v.getValue());
53
                    break;
54
            }
55

    
56

    
57
        }
58

    
59
        if(communities.size() > 0){
60
            final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
61
            communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
62
            for(String result: resultIds){
63
                keyOut.set(result);
64
                outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, result)).getBytes());
65
                context.write(keyOut, outValue);
66
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
67
            }
68

    
69
        }
70

    
71

    
72

    
73
    }
74
}
(3-3/5)