Project

General

Profile

« Previous | Next » 

Revision 57211

final logic of propagation of community through organization (products belonging to given organization will be associated to the community)

View differences:

PropagationCommunityThroughOrganizationFileReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2 2

  
3
import com.google.gson.Gson;
4 3
import com.googlecode.protobuf.format.JsonFormat;
5
import eu.dnetlib.data.bulktag.CommunityConfiguration;
6
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
8 4
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
9 5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
10 6
import eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult.CommunityToResultFileReducer;
11 7
import eu.dnetlib.data.proto.ResultProtos;
12
import org.apache.commons.lang.StringUtils;
13 8
import org.apache.commons.logging.Log;
14 9
import org.apache.commons.logging.LogFactory;
15 10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17 11
import org.apache.hadoop.io.Text;
18 12
import org.apache.hadoop.mapreduce.Reducer;
19 13

  
......
23 17
import java.util.Set;
24 18

  
25 19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
26
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
27 20

  
28 21
public class PropagationCommunityThroughOrganizationFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> {
29 22

  
......
51 44

  
52 45
        while(it.hasNext()){
53 46
            Value v = Value.fromJson(it.next().toString());
54
            if(v.getType() == PropagationConstants.Type.fromorganization){
55
                communities.addAll(DedupedList.fromJson(v.getValue()));
56

  
47
            switch (v.getType()){
48
                case fromorganization:
49
                    communities.addAll(DedupedList.fromJson(v.getValue()));
50
                    break;
51
                case fromresult:
52
                    resultIds.add(v.getValue());
53
                    break;
57 54
            }
58 55

  
59
            if(v.getType() == PropagationConstants.Type.fromresult){
60
                resultIds.add(v.getValue());
61
            }
62 56

  
63 57
        }
64 58

  
65

  
66
        resultIds.stream().forEach(result -> {
67
            try {
68
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
69
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
70

  
71
                String keyString = Bytes.toString(key.get());
72
                keyOut.set(keyString);
73

  
74
                outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, keyString)).getBytes());
59
        if(communities.size() > 0){
60
            final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
61
            communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
62
            for(String result: resultIds){
63
                keyOut.set(result);
64
                outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, result)).getBytes());
75 65
                context.write(keyOut, outValue);
76 66
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
67
            }
77 68

  
69
        }
78 70

  
79
            } catch (IOException e) {
80
                e.printStackTrace();
81
            } catch (InterruptedException e) {
82
                e.printStackTrace();
83
            }
84
        });
85 71

  
86 72

  
87 73
    }

Also available in: Unified diff