Project

General

Profile

« Previous | Next » 

Revision 57211

final logic of propagation of community through organization (products belonging to given organization will be associated to the community)

View differences:

PropagationCommunityThroughOrganizationReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization;
2 2

  
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.bulktag.CommunityConfiguration;
5
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants;
7 3
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils;
8 4
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
9 5
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11 6
import org.apache.commons.logging.Log;
12 7
import org.apache.commons.logging.LogFactory;
13 8
import org.apache.hadoop.hbase.client.Put;
14 9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15 10
import org.apache.hadoop.hbase.mapreduce.TableReducer;
16 11
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.hbase.util.Hash;
18 12
import org.apache.hadoop.io.Text;
19 13

  
20 14
import java.io.IOException;
......
45 39

  
46 40
        while(it.hasNext()){
47 41
            Value v = Value.fromJson(it.next().toString());
48
            if(v.getType() == PropagationConstants.Type.fromorganization){
49
                communities.addAll(DedupedList.fromJson(v.getValue()));
50

  
42
            switch (v.getType()){
43
                case fromorganization:
44
                    communities.addAll(DedupedList.fromJson(v.getValue()));
45
                    break;
46
                case fromresult:
47
                    resultIds.add(v.getValue());
48
                    break;
51 49
            }
52 50

  
53
            if(v.getType() == PropagationConstants.Type.fromresult){
54
                resultIds.add(v.getValue());
55
            }
56 51

  
57 52
        }
58 53

  
59

  
60
        resultIds.stream().forEach(result -> {
61
            try {
62
                final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
63
                communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME)));
54
        if(communities.size() > 0){
55
            final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder();
56
            communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME)));
57
            for(String result: resultIds){
64 58
                final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray());
65 59
                keyOut.set(Bytes.toBytes(result));
66 60
                context.write(keyOut, put);
67 61
                context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size());
68
            } catch (IOException e) {
69
                e.printStackTrace();
70
            } catch (InterruptedException e) {
71
                e.printStackTrace();
72 62
            }
73
        });
74 63

  
64
        }
75 65

  
76

  
77

  
78

  
79

  
80 66
    }
81 67

  
82 68

  

Also available in: Unified diff