Revision 57211
Added by Miriam Baglioni over 4 years ago
PropagationCommunityThroughOrganizationReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; |
2 | 2 |
|
3 |
import com.google.gson.Gson; |
|
4 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
5 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants; |
|
7 | 3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils; |
8 | 4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
9 | 5 |
import eu.dnetlib.data.proto.ResultProtos; |
10 |
import org.apache.commons.lang.StringUtils; |
|
11 | 6 |
import org.apache.commons.logging.Log; |
12 | 7 |
import org.apache.commons.logging.LogFactory; |
13 | 8 |
import org.apache.hadoop.hbase.client.Put; |
14 | 9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
15 | 10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
16 | 11 |
import org.apache.hadoop.hbase.util.Bytes; |
17 |
import org.apache.hadoop.hbase.util.Hash; |
|
18 | 12 |
import org.apache.hadoop.io.Text; |
19 | 13 |
|
20 | 14 |
import java.io.IOException; |
... | ... | |
45 | 39 |
|
46 | 40 |
while(it.hasNext()){ |
47 | 41 |
Value v = Value.fromJson(it.next().toString()); |
48 |
if(v.getType() == PropagationConstants.Type.fromorganization){ |
|
49 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
50 |
|
|
42 |
switch (v.getType()){ |
|
43 |
case fromorganization: |
|
44 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
45 |
break; |
|
46 |
case fromresult: |
|
47 |
resultIds.add(v.getValue()); |
|
48 |
break; |
|
51 | 49 |
} |
52 | 50 |
|
53 |
if(v.getType() == PropagationConstants.Type.fromresult){ |
|
54 |
resultIds.add(v.getValue()); |
|
55 |
} |
|
56 | 51 |
|
57 | 52 |
} |
58 | 53 |
|
59 |
|
|
60 |
resultIds.stream().forEach(result -> { |
|
61 |
try { |
|
62 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
63 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME))); |
|
54 |
if(communities.size() > 0){ |
|
55 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
56 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME))); |
|
57 |
for(String result: resultIds){ |
|
64 | 58 |
final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray()); |
65 | 59 |
keyOut.set(Bytes.toBytes(result)); |
66 | 60 |
context.write(keyOut, put); |
67 | 61 |
context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size()); |
68 |
} catch (IOException e) { |
|
69 |
e.printStackTrace(); |
|
70 |
} catch (InterruptedException e) { |
|
71 |
e.printStackTrace(); |
|
72 | 62 |
} |
73 |
}); |
|
74 | 63 |
|
64 |
} |
|
75 | 65 |
|
76 |
|
|
77 |
|
|
78 |
|
|
79 |
|
|
80 | 66 |
} |
81 | 67 |
|
82 | 68 |
|
Also available in: Unified diff
final logic of propagation of community through organization (products belonging to given organization will be associated to the community)