Revision 57211
Added by Miriam Baglioni over 4 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationConstants.java | ||
---|---|---|
77 | 77 |
public static final String DEDUP_RELATION_RESULT = REL_TYPE_RESULT + "_" + SUBREL_DEDUP + "_"; |
78 | 78 |
|
79 | 79 |
|
80 |
public static final String[] DEFAULT_ORGANIZATION_RELATION_SET = new String[]{"resultOrganization_affiliation_isAuthorInstitutionOf","resultOrganization_affiliation_hasAuthorInstitution"}; |
|
80 |
//public static final String[] DEFAULT_ORGANIZATION_RELATION_SET = new String[]{"resultOrganization_affiliation_isAuthorInstitutionOf","resultOrganization_affiliation_hasAuthorInstitution"};
|
|
81 | 81 |
|
82 | 82 |
|
83 | 83 |
public static final Set<String> DEFAULT_ALLOWED_DATASOURCES = Sets.newHashSet("pubsrepository::institutional"); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/Utils.java | ||
---|---|---|
78 | 78 |
|
79 | 79 |
final Map<byte[], byte[]> relationMap = value.getFamilyMap(Bytes.toBytes(sem_rel)); |
80 | 80 |
|
81 |
context.getCounter(counter, sem_rel).increment(relationMap.size()); |
|
82 |
|
|
83 |
|
|
84 | 81 |
/* |
85 | 82 |
we could extract the target qualifiers from the familyMap's keyset, but we also need to check the relationship is not deletedbyinference |
86 | 83 |
return relationMap.keySet().stream() |
... | ... | |
88 | 85 |
.collect(Collectors.toCollection(HashSet::new)); |
89 | 86 |
*/ |
90 | 87 |
|
91 |
return relationMap.values().stream()
|
|
88 |
HashSet<String> valid_relation = relationMap.values().stream()
|
|
92 | 89 |
.map(b -> asOaf(b)) |
93 | 90 |
.filter(Objects::nonNull) |
94 | 91 |
.filter(o -> isValid(o)) |
... | ... | |
96 | 93 |
.map(o -> o.getRel().getTarget()) |
97 | 94 |
.collect(Collectors.toCollection(HashSet::new)); |
98 | 95 |
|
96 |
context.getCounter(counter, sem_rel).increment(valid_relation.size()); |
|
97 |
|
|
98 |
return valid_relation; |
|
99 | 99 |
} |
100 | 100 |
|
101 | 101 |
private static OafProtos.Oaf asOaf(byte[] r) { |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitythroughorganization/PropagationCommunityThroughOrganizationMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; |
2 | 2 |
|
3 | 3 |
import com.google.gson.Gson; |
4 |
import eu.dnetlib.data.bulktag.Organization; |
|
5 | 4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
6 | 5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
7 |
import eu.dnetlib.data.proto.*; |
|
6 |
import eu.dnetlib.data.proto.OafProtos; |
|
7 |
import eu.dnetlib.data.proto.TypeProtos; |
|
8 |
import org.apache.commons.lang3.StringUtils; |
|
8 | 9 |
import org.apache.hadoop.hbase.client.Result; |
9 | 10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
10 | 11 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
... | ... | |
12 | 13 |
import org.apache.hadoop.io.Text; |
13 | 14 |
|
14 | 15 |
import java.io.IOException; |
15 |
import java.util.HashSet; |
|
16 | 16 |
import java.util.Set; |
17 | 17 |
|
18 | 18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
... | ... | |
23 | 23 |
private Text valueOut; |
24 | 24 |
private ImmutableBytesWritable keyOut; |
25 | 25 |
private OrganizationMap organizationMap; |
26 |
|
|
26 | 27 |
//seleziono il tipo della entry: |
27 | 28 |
//Result: |
28 | 29 |
//se non e' deleted by inference ed ha organizzazioni a cui e' associato, |
... | ... | |
36 | 37 |
valueOut = new Text(); |
37 | 38 |
keyOut = new ImmutableBytesWritable(); |
38 | 39 |
organizationMap = new Gson().fromJson(context.getConfiguration().get("organization.community.map"), OrganizationMap.class); |
40 |
System.out.println("got organization map: " + new Gson().toJson(organizationMap)) ; |
|
39 | 41 |
} |
40 | 42 |
|
41 | 43 |
@Override |
... | ... | |
45 | 47 |
if (entity != null) { |
46 | 48 |
switch (type) { |
47 | 49 |
case organization: |
48 |
valueOut.set(Value.newInstance( |
|
49 |
new Gson().toJson( |
|
50 |
getCommunityList(Bytes.toString(keyIn.get()), |
|
51 |
getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION)), //search for organization it merges |
|
52 |
DedupedList.class), |
|
53 |
ORGANIZATION_COMMUNITY_TRUST, |
|
54 |
Type.fromorganization).toJson()); |
|
55 |
context.write(keyIn, valueOut); |
|
56 |
context.getCounter(COUNTER_PROPAGATION, "emit for organization "); |
|
50 |
DedupedList communityList = getCommunityList(Bytes.toString(keyIn.get()), |
|
51 |
getRelationTarget(value, DEDUP_RELATION_ORGANIZATION + REL_DEDUP_REPRESENTATIVE_RESULT, context, COUNTER_PROPAGATION), context); |
|
52 |
if (communityList.size() > 0){ |
|
53 |
valueOut.set(Value.newInstance( |
|
54 |
new Gson().toJson( |
|
55 |
communityList, //search for organization it merges |
|
56 |
DedupedList.class), |
|
57 |
ORGANIZATION_COMMUNITY_TRUST, |
|
58 |
Type.fromorganization).toJson()); |
|
59 |
context.write(keyIn, valueOut); |
|
60 |
context.getCounter(COUNTER_PROPAGATION, "emit for organization ").increment(1); |
|
61 |
}else{ |
|
62 |
context.getCounter(COUNTER_PROPAGATION, "community list size = 0 ").increment(1); |
|
63 |
} |
|
64 |
|
|
57 | 65 |
break; |
58 | 66 |
case result: |
59 |
Set<String> result_organization = getRelationTarget(value, REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION); |
|
60 |
result_organization.stream().forEach(org -> { |
|
61 |
try { |
|
62 |
emit(org, Bytes.toString(keyIn.get()), context); |
|
63 |
} catch (IOException e) { |
|
64 |
e.printStackTrace(); |
|
65 |
} catch (InterruptedException e) { |
|
66 |
e.printStackTrace(); |
|
67 |
} |
|
68 |
}); |
|
69 |
|
|
67 |
Set<String> result_organization = getRelationTarget(value, RELATION_ORGANIZATION + REL_RESULT_ORGANIZATION, context, COUNTER_PROPAGATION); |
|
68 |
for(String org: result_organization) |
|
69 |
emit(org, Bytes.toString(keyIn.get()), context); |
|
70 | 70 |
break; |
71 | 71 |
} |
72 | 72 |
} |
73 | 73 |
} |
74 | 74 |
|
75 |
private DedupedList getCommunityList(String organizationId, Set<String> relationTarget) { |
|
75 |
private DedupedList getCommunityList(String organizationId, Set<String> relationTarget, Context context) {
|
|
76 | 76 |
DedupedList communityList = new DedupedList(); |
77 |
relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(org))); |
|
78 |
communityList.addAll(organizationMap.get(organizationId)); |
|
77 |
relationTarget.stream().forEach(org -> communityList.addAll(organizationMap.get(StringUtils.substringAfter(org, "|")))); |
|
78 |
communityList.addAll(organizationMap.get(StringUtils.substringAfter(organizationId,"|"))); |
|
79 |
communityList.stream().forEach(c->context.getCounter(COUNTER_PROPAGATION,"found organization for " + c).increment(1)); |
|
79 | 80 |
return communityList; |
80 | 81 |
} |
81 | 82 |
|
... | ... | |
83 | 84 |
keyOut.set(Bytes.toBytes(org)); |
84 | 85 |
valueOut.set(Value.newInstance(resId, ORGANIZATION_COMMUNITY_TRUST, Type.fromresult).toJson()); |
85 | 86 |
context.write(keyOut,valueOut); |
86 |
context.getCounter(COUNTER_PROPAGATION, "emit for result"); |
|
87 |
context.getCounter(COUNTER_PROPAGATION, "emit for result").increment(1);
|
|
87 | 88 |
} |
88 | 89 |
|
89 | 90 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitythroughorganization/PropagationCommunityThroughOrganizationFileReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; |
2 | 2 |
|
3 |
import com.google.gson.Gson; |
|
4 | 3 |
import com.googlecode.protobuf.format.JsonFormat; |
5 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
6 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants; |
|
8 | 4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils; |
9 | 5 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
10 | 6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.communitytoresult.CommunityToResultFileReducer; |
11 | 7 |
import eu.dnetlib.data.proto.ResultProtos; |
12 |
import org.apache.commons.lang.StringUtils; |
|
13 | 8 |
import org.apache.commons.logging.Log; |
14 | 9 |
import org.apache.commons.logging.LogFactory; |
15 | 10 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
16 |
import org.apache.hadoop.hbase.util.Bytes; |
|
17 | 11 |
import org.apache.hadoop.io.Text; |
18 | 12 |
import org.apache.hadoop.mapreduce.Reducer; |
19 | 13 |
|
... | ... | |
23 | 17 |
import java.util.Set; |
24 | 18 |
|
25 | 19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
26 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
|
27 | 20 |
|
28 | 21 |
public class PropagationCommunityThroughOrganizationFileReducer extends Reducer<ImmutableBytesWritable, Text, Text, Text> { |
29 | 22 |
|
... | ... | |
51 | 44 |
|
52 | 45 |
while(it.hasNext()){ |
53 | 46 |
Value v = Value.fromJson(it.next().toString()); |
54 |
if(v.getType() == PropagationConstants.Type.fromorganization){ |
|
55 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
56 |
|
|
47 |
switch (v.getType()){ |
|
48 |
case fromorganization: |
|
49 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
50 |
break; |
|
51 |
case fromresult: |
|
52 |
resultIds.add(v.getValue()); |
|
53 |
break; |
|
57 | 54 |
} |
58 | 55 |
|
59 |
if(v.getType() == PropagationConstants.Type.fromresult){ |
|
60 |
resultIds.add(v.getValue()); |
|
61 |
} |
|
62 | 56 |
|
63 | 57 |
} |
64 | 58 |
|
65 |
|
|
66 |
resultIds.stream().forEach(result -> { |
|
67 |
try { |
|
68 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
69 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME))); |
|
70 |
|
|
71 |
String keyString = Bytes.toString(key.get()); |
|
72 |
keyOut.set(keyString); |
|
73 |
|
|
74 |
outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, keyString)).getBytes()); |
|
59 |
if(communities.size() > 0){ |
|
60 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
61 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME))); |
|
62 |
for(String result: resultIds){ |
|
63 |
keyOut.set(result); |
|
64 |
outValue.set(JsonFormat.printToString(Utils.getUpdate(metadata, result)).getBytes()); |
|
75 | 65 |
context.write(keyOut, outValue); |
76 | 66 |
context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size()); |
67 |
} |
|
77 | 68 |
|
69 |
} |
|
78 | 70 |
|
79 |
} catch (IOException e) { |
|
80 |
e.printStackTrace(); |
|
81 |
} catch (InterruptedException e) { |
|
82 |
e.printStackTrace(); |
|
83 |
} |
|
84 |
}); |
|
85 | 71 |
|
86 | 72 |
|
87 | 73 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/communitythroughorganization/PropagationCommunityThroughOrganizationReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.communitythroughorganization; |
2 | 2 |
|
3 |
import com.google.gson.Gson; |
|
4 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
5 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants; |
|
7 | 3 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Utils; |
8 | 4 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
9 | 5 |
import eu.dnetlib.data.proto.ResultProtos; |
10 |
import org.apache.commons.lang.StringUtils; |
|
11 | 6 |
import org.apache.commons.logging.Log; |
12 | 7 |
import org.apache.commons.logging.LogFactory; |
13 | 8 |
import org.apache.hadoop.hbase.client.Put; |
14 | 9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
15 | 10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
16 | 11 |
import org.apache.hadoop.hbase.util.Bytes; |
17 |
import org.apache.hadoop.hbase.util.Hash; |
|
18 | 12 |
import org.apache.hadoop.io.Text; |
19 | 13 |
|
20 | 14 |
import java.io.IOException; |
... | ... | |
45 | 39 |
|
46 | 40 |
while(it.hasNext()){ |
47 | 41 |
Value v = Value.fromJson(it.next().toString()); |
48 |
if(v.getType() == PropagationConstants.Type.fromorganization){ |
|
49 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
50 |
|
|
42 |
switch (v.getType()){ |
|
43 |
case fromorganization: |
|
44 |
communities.addAll(DedupedList.fromJson(v.getValue())); |
|
45 |
break; |
|
46 |
case fromresult: |
|
47 |
resultIds.add(v.getValue()); |
|
48 |
break; |
|
51 | 49 |
} |
52 | 50 |
|
53 |
if(v.getType() == PropagationConstants.Type.fromresult){ |
|
54 |
resultIds.add(v.getValue()); |
|
55 |
} |
|
56 | 51 |
|
57 | 52 |
} |
58 | 53 |
|
59 |
|
|
60 |
resultIds.stream().forEach(result -> { |
|
61 |
try { |
|
62 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
63 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE, CLASS_ORGANIZATION_NAME))); |
|
54 |
if(communities.size() > 0){ |
|
55 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder(); |
|
56 |
communities.stream().forEach(community->metadata.addContext(Utils.getContext(community, ORGANIZATION_COMMUNITY_TRUST, CLASS_ORGANIZATION_ID, DATA_INFO_TYPE,CLASS_ORGANIZATION_NAME))); |
|
57 |
for(String result: resultIds){ |
|
64 | 58 |
final Put put = new Put(Bytes.toBytes(result)).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), Utils.getUpdate(metadata, result).toByteArray()); |
65 | 59 |
keyOut.set(Bytes.toBytes(result)); |
66 | 60 |
context.write(keyOut, put); |
67 | 61 |
context.getCounter(COUNTER_PROPAGATION, "added community to result").increment(communities.size()); |
68 |
} catch (IOException e) { |
|
69 |
e.printStackTrace(); |
|
70 |
} catch (InterruptedException e) { |
|
71 |
e.printStackTrace(); |
|
72 | 62 |
} |
73 |
}); |
|
74 | 63 |
|
64 |
} |
|
75 | 65 |
|
76 |
|
|
77 |
|
|
78 |
|
|
79 |
|
|
80 | 66 |
} |
81 | 67 |
|
82 | 68 |
|
Also available in: Unified diff
final logic of propagation of community through organization (products belonging to given organization will be associated to the community)