Revision 57211
Added by Miriam Baglioni over 4 years ago
PropagationCommunityThroughOrganizationMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; |
2 | 2 |
|
3 | 3 |
import com.google.gson.Gson; |
4 |
import eu.dnetlib.data.bulktag.Organization; |
|
5 | 4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
6 | 5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
7 |
import eu.dnetlib.data.proto.*; |
|
6 |
import eu.dnetlib.data.proto.OafProtos; |
|
7 |
import eu.dnetlib.data.proto.TypeProtos; |
|
8 |
import org.apache.commons.lang3.StringUtils; |
|
8 | 9 |
import org.apache.hadoop.hbase.client.Result; |
9 | 10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
10 | 11 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
... | ... | |
12 | 13 |
import org.apache.hadoop.io.Text; |
13 | 14 |
|
14 | 15 |
import java.io.IOException; |
15 |
import java.util.HashSet; |
|
16 | 16 |
import java.util.Set; |
17 | 17 |
|
18 | 18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
... | ... | |
23 | 23 |
private Text valueOut; |
24 | 24 |
private ImmutableBytesWritable keyOut; |
25 | 25 |
private OrganizationMap organizationMap; |
26 |
|
|
26 | 27 |
//seleziono il tipo della entry: |
27 | 28 |
//Result: |
28 | 29 |
//se non e' deleted by inference ed ha organizzazioni a cui e' associato, |
... | ... | |
36 | 37 |
valueOut = new Text(); |
37 | 38 |
keyOut = new ImmutableBytesWritable(); |
38 | 39 |
organizationMap = new Gson().fromJson(context.getConfiguration().get("organization.community.map"), OrganizationMap.class); |
40 |
System.out.println("got organization map: " + new Gson().toJson(organizationMap)) ; |
|
39 | 41 |
} |
40 | 42 |
|
41 | 43 |
@Override |
... | ... | |
45 | 47 |
if (entity != null) { |
46 | 48 |
switch (type) { |
47 | 49 |
case organization: |
48 |
valueOut.set(Value.newInstance( |
|
49 |
new Gson().toJson( |
|
50 |
getCommunityList(Bytes.toString(keyIn.get()), |
|
51 |
getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION)), //search for organization it merges |
|
52 |
DedupedList.class), |
|
53 |
ORGANIZATION_COMMUNITY_TRUST, |
|
54 |
Type.fromorganization).toJson()); |
|
55 |
context.write(keyIn, valueOut); |
|
56 |
context.getCounter(COUNTER_PROPAGATION, "emit for organization "); |
|
50 |
DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()), |
|
51 |
getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context); |
|
52 |
if (communityList.size() > 0){ |
|
53 |
valueOut.set(Value.newInstance( |
|
54 |
new Gson().toJson( |
|
55 |
communityList, //search for organization it merges |
|
56 |
DedupedList.class), |
|
57 |
ORGANIZATION_COMMUNITY_TRUST, |
|
58 |
Type.fromorganization).toJson()); |
|
59 |
context.write(keyIn, valueOut); |
|
60 |
context.getCounter(COUNTER_PROPAGATION, "emit for organization ").increment(1); |
|
61 |
}else{ |
|
62 |
context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1); |
|
63 |
} |
|
64 |
|
|
57 | 65 |
break; |
58 | 66 |
case result: |
59 |
Set<String> result_organization = getRelationTarget(value, REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION); |
|
60 |
result_organization.stream().forEach(org -> { |
|
61 |
try { |
|
62 |
emit(org, Bytes.toString(keyIn.get()), context); |
|
63 |
} catch (IOException e) { |
|
64 |
e.printStackTrace(); |
|
65 |
} catch (InterruptedException e) { |
|
66 |
e.printStackTrace(); |
|
67 |
} |
|
68 |
}); |
|
69 |
|
|
67 |
Set<String> result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION); |
|
68 |
for(String org: result_organization) |
|
69 |
emit(org, Bytes.toString(keyIn.get()), context); |
|
70 | 70 |
break; |
71 | 71 |
} |
72 | 72 |
} |
73 | 73 |
} |
74 | 74 |
|
75 |
private DedupedList getCommunityList(String organizationId, Set<String> relationTarget) { |
|
75 |
private DedupedList getCommunityList(String organizationId, Set<String> relationTarget, Context context) {
|
|
76 | 76 |
DedupedList communityList = new DedupedList(); |
77 |
relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(org))); |
|
78 |
communityList.addAll(organizationMap.get(organizationId)); |
|
77 |
relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|")))); |
|
78 |
communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|"))); |
|
79 |
communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organization for " + c).increment(1)); |
|
79 | 80 |
return communityList; |
80 | 81 |
} |
81 | 82 |
|
... | ... | |
83 | 84 |
keyOut.set(Bytes.toBytes(org)); |
84 | 85 |
valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson()); |
85 | 86 |
context.write(keyOut,valueOut); |
86 |
context.getCounter(COUNTER_PROPAGATION, "emit for result"); |
|
87 |
context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
|
|
87 | 88 |
} |
88 | 89 |
|
89 | 90 |
} |
Also available in: Unified diff
final logic of propagation of community through organization (products belonging to given organization will be associated to the community)