Revision 57211
Added by Miriam Baglioni over 4 years ago
PropagationCommunityThroughOrganizationFileReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; |
2 | 2 |
|
3 |
import com.google.gson.Gson; |
|
4 | 3 |
import com.googlecode.protobuf.format.JsonFormat; |
5 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
6 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants; |
|
8 | 4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils; |
9 | 5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
10 | 6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult.CommunityToResultFileReducer; |
11 | 7 |
import eu.dnetlib.data.proto.ResultProtos; |
12 |
import org.apache.commons.lang.StringUtils; |
|
13 | 8 |
import org.apache.commons.logging.Log; |
14 | 9 |
import org.apache.commons.logging.LogFactory; |
15 | 10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
16 |
import org.apache.hadoop.hbase.util.Bytes; |
|
17 | 11 |
import org.apache.hadoop.io.Text; |
18 | 12 |
import org.apache.hadoop.mapreduce.Reducer; |
19 | 13 |
|
... | ... | |
23 | 17 |
import java.util.Set; |
24 | 18 |
|
25 | 19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
26 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
|
27 | 20 |
|
28 | 21 |
public class PropagationCommunityThroughOrganizationFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> { |
29 | 22 |
|
... | ... | |
51 | 44 |
|
52 | 45 |
while(it.hasNext()){ |
53 | 46 |
Value v = Value.fromJson(it.next().toString()); |
54 |
if(v.getType() == PropagationConstants.Type.fromorganization){ |
|
55 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
56 |
|
|
47 |
switch (v.getType()){ |
|
48 |
case fromorganization: |
|
49 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
50 |
break; |
|
51 |
case fromresult: |
|
52 |
resultIds.add(v.getValue()); |
|
53 |
break; |
|
57 | 54 |
} |
58 | 55 |
|
59 |
if(v.getType() == PropagationConstants.Type.fromresult){ |
|
60 |
resultIds.add(v.getValue()); |
|
61 |
} |
|
62 | 56 |
|
63 | 57 |
} |
64 | 58 |
|
65 |
|
|
66 |
resultIds.stream().forEach(result -> { |
|
67 |
try { |
|
68 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
69 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME))); |
|
70 |
|
|
71 |
String keyString = Bytes.toString(key.get()); |
|
72 |
keyOut.set(keyString); |
|
73 |
|
|
74 |
outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, keyString)).getBytes()); |
|
59 |
if(communities.size() > 0){ |
|
60 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
61 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME))); |
|
62 |
for(String result: resultIds){ |
|
63 |
keyOut.set(result); |
|
64 |
outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, result)).getBytes()); |
|
75 | 65 |
context.write(keyOut, outValue); |
76 | 66 |
context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size()); |
67 |
} |
|
77 | 68 |
|
69 |
} |
|
78 | 70 |
|
79 |
} catch (IOException e) { |
|
80 |
e.printStackTrace(); |
|
81 |
} catch (InterruptedException e) { |
|
82 |
e.printStackTrace(); |
|
83 |
} |
|
84 |
}); |
|
85 | 71 |
|
86 | 72 |
|
87 | 73 |
} |
Also available in: Unified diff
final logic of propagation of community through organization (products belonging to given organization will be associated to the community)