Revision 53094
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryInstitutionalOrganizationMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
4 |
import eu.dnetlib.data.proto.*; |
|
5 |
import org.apache.commons.collections.MapUtils; |
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
import java.io.IOException; |
|
12 |
import java.util.Map; |
|
13 |
import java.util.stream.Collectors; |
|
14 |
|
|
15 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
16 |
|
|
17 |
/** |
|
18 |
* Created by miriam on 17/08/2018. |
|
19 |
*/ |
|
20 |
public class PropagationCountryInstitutionalOrganizationMapper extends TableMapper<Key, ImmutableBytesWritable> { |
|
21 |
|
|
22 |
|
|
23 |
private ImmutableBytesWritable valueOut; |
|
24 |
|
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
28 |
super.setup(context); |
|
29 |
|
|
30 |
valueOut = new ImmutableBytesWritable(); |
|
31 |
} |
|
32 |
|
|
33 |
@Override |
|
34 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
35 |
|
|
36 |
final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result")); |
|
37 |
final byte[] body = resultMap.get(Bytes.toBytes("body")); |
|
38 |
if (body != null){ |
|
39 |
OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body); |
|
40 |
OafProtos.OafEntity entity = oaf.getEntity(); |
|
41 |
switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) { |
|
42 |
case datasource: |
|
43 |
DatasourceProtos.Datasource datasource = entity.getDatasource(); |
|
44 |
if (datasource == null){ |
|
45 |
throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto"); |
|
46 |
} |
|
47 |
final Key datasource1 = Key.datasource(entity.getId()); |
|
48 |
if(datasource.getMetadata().getDatasourcetype().getClassid().equals("pubsrepository::institutional")){ |
|
49 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
50 |
valueOut.set(ONE.getBytes()); |
|
51 |
context.write(datasource1,valueOut); |
|
52 |
}else{ |
|
53 |
context.getCounter(COUNTER_PROPAGATION," not institutional datasource").increment(1); |
|
54 |
valueOut.set(ZERO.getBytes()); |
|
55 |
context.write(datasource1,valueOut); |
|
56 |
} |
|
57 |
break; |
|
58 |
case organization: |
|
59 |
OrganizationProtos.Organization organization = entity.getOrganization(); |
|
60 |
if (organization == null){ |
|
61 |
throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto"); |
|
62 |
} |
|
63 |
|
|
64 |
FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry(); |
|
65 |
if (country == null){ |
|
66 |
context.getCounter(COUNTER_PROPAGATION,"country elem does not exists").increment(1); |
|
67 |
}else{ |
|
68 |
final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes()); |
|
69 |
if (MapUtils.isNotEmpty(ds_org)) { |
|
70 |
|
|
71 |
for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) { |
|
72 |
valueOut.set(country.getClassid().getBytes()); |
|
73 |
context.write(Key.organization(dsId), valueOut); |
|
74 |
context.getCounter(COUNTER_PROPAGATION,"country ").increment(1); |
|
75 |
} |
|
76 |
} |
|
77 |
} |
|
78 |
break; |
|
79 |
case result: |
|
80 |
ResultProtos.Result result = entity.getResult(); |
|
81 |
|
|
82 |
for(ResultProtos.Result.Instance instance : result.getInstanceList()) { |
|
83 |
//todo add check if key is not empty and field is not null |
|
84 |
|
|
85 |
String hostedBy = instance.getHostedby().getKey(); |
|
86 |
valueOut.set(entity.getId().getBytes()); |
|
87 |
context.write(Key.publication(hostedBy), valueOut); |
|
88 |
|
|
89 |
String collectedFrom = instance.getCollectedfrom().getKey(); |
|
90 |
if (!hostedBy.equals(collectedFrom)) { |
|
91 |
context.write(Key.publication(collectedFrom),valueOut); |
|
92 |
} |
|
93 |
} |
|
94 |
break; |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
|
|
99 |
} |
|
100 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryInstitutionalOrganizationReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Iterator; |
|
5 |
|
|
6 |
|
|
7 |
import eu.dnetlib.data.proto.*; |
|
8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
10 |
import org.apache.hadoop.hbase.util.Bytes; |
|
11 |
import org.apache.hadoop.hbase.client.Put; |
|
12 |
|
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
|
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
17 |
|
|
18 |
public class PropagationCountryInstitutionalOrganizationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable>{ |
|
19 |
|
|
20 |
private static final Log log = LogFactory.getLog(PropagationCountryInstitutionalOrganizationReducer.class); |
|
21 |
|
|
22 |
final static String DNETCOUNTRYSCHEMA = "dnet:countries"; |
|
23 |
|
|
24 |
private ImmutableBytesWritable keyOut; |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
28 |
super.setup(context); |
|
29 |
keyOut = new ImmutableBytesWritable(); |
|
30 |
} |
|
31 |
|
|
32 |
@Override |
|
33 |
protected void reduce(final Key dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
|
34 |
|
|
35 |
final Iterator<ImmutableBytesWritable> it = values.iterator(); |
|
36 |
if(!it.hasNext()){ |
|
37 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
|
38 |
return; |
|
39 |
} |
|
40 |
final String first = Bytes.toString(it.next().get()); |
|
41 |
if (!(first.equals(ZERO) || first.equals(ONE))) { |
|
42 |
throw new RuntimeException("First Element in reducer is not type of datasource"); |
|
43 |
} |
|
44 |
|
|
45 |
//ensure we are dealing with an institutional repository |
|
46 |
if (first.equals(ONE)) { |
|
47 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
48 |
if(!it.hasNext()){ |
|
49 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
|
50 |
return; |
|
51 |
} |
|
52 |
final String country = Bytes.toString(it.next().get()); // need to update the information for the country to each element in the iterator |
|
53 |
if(country.trim().length() != 2){ |
|
54 |
throw new RuntimeException("Second Element in reducer is not country " + country); |
|
55 |
} |
|
56 |
|
|
57 |
while(it.hasNext()) { |
|
58 |
byte[] targetRowKey = it.next().get(); |
|
59 |
String resultId = Bytes.toString(targetRowKey); |
|
60 |
if (!resultId.startsWith(("50|"))) { |
|
61 |
context.getCounter(COUNTER_PROPAGATION,"resultId expected in ordering was not found").increment(1); |
|
62 |
//throw new RuntimeException("Problem in ordering. Elements different from publication in list after second position " + targetRowKey); |
|
63 |
} |
|
64 |
byte[] oafUpdate = getOafCountry(resultId, country); |
|
65 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate); |
|
66 |
|
|
67 |
keyOut.set(targetRowKey); |
|
68 |
context.write(keyOut, put); |
|
69 |
context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1); |
|
70 |
} |
|
71 |
} else { |
|
72 |
context.getCounter(COUNTER_PROPAGATION,"not institutional datasource").increment(1); |
|
73 |
} |
|
74 |
|
|
75 |
} |
|
76 |
|
|
77 |
private byte[] getOafCountry(String resultId, String countryValue) { |
|
78 |
|
|
79 |
final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder() |
|
80 |
.setClassid(countryValue) |
|
81 |
.setClassname(countryValue) |
|
82 |
.setSchemeid(DNETCOUNTRYSCHEMA) |
|
83 |
.setSchemename(DNETCOUNTRYSCHEMA); |
|
84 |
|
|
85 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country); |
|
86 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
87 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
88 |
.setType(TypeProtos.Type.result) |
|
89 |
.setId(resultId) |
|
90 |
.setResult(result); |
|
91 |
|
|
92 |
return OafProtos.Oaf.newBuilder() |
|
93 |
.setKind(KindProtos.Kind.entity) |
|
94 |
.setEntity(entity) |
|
95 |
.build() |
|
96 |
.toByteArray(); |
|
97 |
} |
|
98 |
|
|
99 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryFromDsOrgResultReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Iterator; |
|
5 |
|
|
6 |
|
|
7 |
import eu.dnetlib.data.proto.*; |
|
8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
10 |
import org.apache.hadoop.hbase.util.Bytes; |
|
11 |
import org.apache.hadoop.hbase.client.Put; |
|
12 |
|
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
|
|
16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
17 |
|
|
18 |
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable>{ |
|
19 |
|
|
20 |
private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class); |
|
21 |
|
|
22 |
final static String DNETCOUNTRYSCHEMA = "dnet:countries"; |
|
23 |
|
|
24 |
private ImmutableBytesWritable keyOut; |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
28 |
super.setup(context); |
|
29 |
keyOut = new ImmutableBytesWritable(); |
|
30 |
} |
|
31 |
|
|
32 |
@Override |
|
33 |
protected void reduce(final Key dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
|
34 |
|
|
35 |
final Iterator<ImmutableBytesWritable> it = values.iterator(); |
|
36 |
if(!it.hasNext()){ |
|
37 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
|
38 |
return; |
|
39 |
} |
|
40 |
final String first = Bytes.toString(it.next().get()); |
|
41 |
if (!(first.equals(ZERO) || first.equals(ONE))) { |
|
42 |
throw new RuntimeException("First Element in reducer is not type of datasource"); |
|
43 |
} |
|
44 |
|
|
45 |
//ensure we are dealing with an institutional repository |
|
46 |
if (first.equals(ONE)) { |
|
47 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
48 |
if(!it.hasNext()){ |
|
49 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
|
50 |
return; |
|
51 |
} |
|
52 |
final String country = Bytes.toString(it.next().get()); // need to update the information for the country to each element in the iterator |
|
53 |
if(country.trim().length() != 2){ |
|
54 |
throw new RuntimeException("Second Element in reducer is not country " + country); |
|
55 |
} |
|
56 |
|
|
57 |
while(it.hasNext()) { |
|
58 |
byte[] targetRowKey = it.next().get(); |
|
59 |
String resultId = Bytes.toString(targetRowKey); |
|
60 |
if (!resultId.startsWith(("50|"))) { |
|
61 |
context.getCounter(COUNTER_PROPAGATION,"resultId expected in ordering was not found").increment(1); |
|
62 |
//throw new RuntimeException("Problem in ordering. Elements different from publication in list after second position " + targetRowKey); |
|
63 |
} |
|
64 |
byte[] oafUpdate = getOafCountry(resultId, country); |
|
65 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate); |
|
66 |
|
|
67 |
keyOut.set(targetRowKey); |
|
68 |
context.write(keyOut, put); |
|
69 |
context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1); |
|
70 |
} |
|
71 |
} else { |
|
72 |
context.getCounter(COUNTER_PROPAGATION,"not institutional datasource").increment(1); |
|
73 |
} |
|
74 |
|
|
75 |
} |
|
76 |
|
|
77 |
private byte[] getOafCountry(String resultId, String countryValue) { |
|
78 |
|
|
79 |
final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder() |
|
80 |
.setClassid(countryValue) |
|
81 |
.setClassname(countryValue) |
|
82 |
.setSchemeid(DNETCOUNTRYSCHEMA) |
|
83 |
.setSchemename(DNETCOUNTRYSCHEMA); |
|
84 |
|
|
85 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country); |
|
86 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
87 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
88 |
.setType(TypeProtos.Type.result) |
|
89 |
.setId(resultId) |
|
90 |
.setResult(result); |
|
91 |
|
|
92 |
return OafProtos.Oaf.newBuilder() |
|
93 |
.setKind(KindProtos.Kind.entity) |
|
94 |
.setEntity(entity) |
|
95 |
.build() |
|
96 |
.toByteArray(); |
|
97 |
} |
|
98 |
|
|
99 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/PropagationCountryFromDsOrgResultMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation; |
|
2 |
|
|
3 |
import com.google.common.base.Splitter; |
|
4 |
import com.google.common.collect.Sets; |
|
5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
6 |
import eu.dnetlib.data.proto.*; |
|
7 |
import org.apache.commons.collections.MapUtils; |
|
8 |
import org.apache.hadoop.hbase.client.Result; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
|
|
13 |
import java.io.IOException; |
|
14 |
import java.util.Map; |
|
15 |
import java.util.Set; |
|
16 |
import java.util.stream.Collectors; |
|
17 |
|
|
18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
|
19 |
|
|
20 |
/** |
|
21 |
* Created by miriam on 17/08/2018. |
|
22 |
*/ |
|
23 |
public class PropagationCountryFromDsOrgResultMapper extends TableMapper<Key, ImmutableBytesWritable> { |
|
24 |
|
|
25 |
|
|
26 |
private ImmutableBytesWritable valueOut; |
|
27 |
|
|
28 |
private Set<String> datasourceTypes = Sets.newHashSet("pubsrepository::institutional"); |
|
29 |
|
|
30 |
@Override |
|
31 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
32 |
super.setup(context); |
|
33 |
|
|
34 |
valueOut = new ImmutableBytesWritable(); |
|
35 |
|
|
36 |
datasourceTypes.addAll(Splitter.on(",").omitEmptyStrings().splitToList(context.getConfiguration().get("datasource.types", ""))); |
|
37 |
} |
|
38 |
|
|
39 |
@Override |
|
40 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
41 |
|
|
42 |
final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result")); |
|
43 |
final byte[] body = resultMap.get(Bytes.toBytes("body")); |
|
44 |
if (body != null){ |
|
45 |
OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body); |
|
46 |
OafProtos.OafEntity entity = oaf.getEntity(); |
|
47 |
switch (OafRowKeyDecoder.decode(keyIn.copyBytes()).getType()) { |
|
48 |
case datasource: |
|
49 |
DatasourceProtos.Datasource datasource = entity.getDatasource(); |
|
50 |
if (datasource == null){ |
|
51 |
throw new RuntimeException("oaf type is datasource, but datasource proto is not found in oafproto"); |
|
52 |
} |
|
53 |
final Key datasource1 = Key.datasource(entity.getId()); |
|
54 |
String dsType = datasource.getMetadata().getDatasourcetype().getClassid(); |
|
55 |
if(datasourceTypes.contains(dsType)) { |
|
56 |
context.getCounter(COUNTER_PROPAGATION, dsType).increment(1); |
|
57 |
valueOut.set(ONE.getBytes()); |
|
58 |
context.write(datasource1,valueOut); |
|
59 |
}else{ |
|
60 |
context.getCounter(COUNTER_PROPAGATION, String.format("not %s", dsType)).increment(1); |
|
61 |
valueOut.set(ZERO.getBytes()); |
|
62 |
context.write(datasource1,valueOut); |
|
63 |
} |
|
64 |
break; |
|
65 |
case organization: |
|
66 |
OrganizationProtos.Organization organization = entity.getOrganization(); |
|
67 |
if (organization == null){ |
|
68 |
throw new RuntimeException("oaf type is organization, but organization proto is not found in oafproto"); |
|
69 |
} |
|
70 |
|
|
71 |
FieldTypeProtos.Qualifier country = organization.getMetadata().getCountry(); |
|
72 |
if (country == null){ |
|
73 |
context.getCounter(COUNTER_PROPAGATION,"country elem does not exists").increment(1); |
|
74 |
}else{ |
|
75 |
final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes()); |
|
76 |
if (MapUtils.isNotEmpty(ds_org)) { |
|
77 |
|
|
78 |
for(String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) { |
|
79 |
valueOut.set(country.getClassid().getBytes()); |
|
80 |
context.write(Key.organization(dsId), valueOut); |
|
81 |
context.getCounter(COUNTER_PROPAGATION,"country ").increment(1); |
|
82 |
} |
|
83 |
} |
|
84 |
} |
|
85 |
break; |
|
86 |
case result: |
|
87 |
ResultProtos.Result result = entity.getResult(); |
|
88 |
|
|
89 |
for(ResultProtos.Result.Instance instance : result.getInstanceList()) { |
|
90 |
//todo add check if key is not empty and field is not null |
|
91 |
|
|
92 |
String hostedBy = instance.getHostedby().getKey(); |
|
93 |
valueOut.set(entity.getId().getBytes()); |
|
94 |
context.write(Key.publication(hostedBy), valueOut); |
|
95 |
|
|
96 |
String collectedFrom = instance.getCollectedfrom().getKey(); |
|
97 |
if (!hostedBy.equals(collectedFrom)) { |
|
98 |
context.write(Key.publication(collectedFrom),valueOut); |
|
99 |
} |
|
100 |
} |
|
101 |
break; |
|
102 |
} |
|
103 |
} |
|
104 |
|
|
105 |
|
|
106 |
} |
|
107 |
} |
Also available in: Unified diff
allow to configure datasource typologies to be considered in the propagation process