Revision 58166
Added by Claudio Atzori about 4 years ago
PropagationCountryFromDsOrgResultMapper.java | ||
---|---|---|
20 | 20 |
import java.util.Map; |
21 | 21 |
import java.util.Set; |
22 | 22 |
import java.util.stream.Collectors; |
23 |
import java.util.stream.Stream; |
|
23 | 24 |
|
24 | 25 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
25 | 26 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*; |
... | ... | |
90 | 91 |
if (country == null) { |
91 | 92 |
context.getCounter(COUNTER_PROPAGATION, "country elem does not exists").increment(1); |
92 | 93 |
} else { |
93 |
final Map<byte[], byte[]> ds_org = value.getFamilyMap("datasourceOrganization_provision_isProvidedBy".getBytes()); |
|
94 |
if (MapUtils.isNotEmpty(ds_org)) { |
|
95 |
|
|
96 |
for (String dsId : ds_org.keySet().stream().map(String::new).collect(Collectors.toList())) { |
|
97 |
|
|
98 |
valueOut.set(Value.newInstance(country.getClassid()).toJson()); |
|
99 |
context.write( InstOrgKey.organization(dsId), valueOut); |
|
100 |
context.getCounter(COUNTER_PROPAGATION, "country ").increment(1); |
|
101 |
} |
|
102 |
} |
|
94 |
valueOut.set(Value.newInstance(country.getClassid()).toJson()); |
|
95 |
Stream.of(value.raw()) |
|
96 |
.filter(kv -> "datasourceOrganization_provision_isProvidedBy".equals(Bytes.toString(kv.getFamily()))) |
|
97 |
.map(kv -> Bytes.toString(kv.getQualifier())) |
|
98 |
.forEach(dsId -> { |
|
99 |
try { |
|
100 |
context.write( InstOrgKey.organization(dsId), valueOut); |
|
101 |
context.getCounter(COUNTER_PROPAGATION, "country").increment(1); |
|
102 |
} catch (IOException | InterruptedException e) { |
|
103 |
throw new RuntimeException(e); |
|
104 |
} |
|
105 |
}); |
|
103 | 106 |
} |
104 | 107 |
|
105 | 108 |
break; |
... | ... | |
139 | 142 |
|
140 | 143 |
|
141 | 144 |
private String getTrust(Result value) throws InvalidProtocolBufferException { |
142 |
final Map<byte[],byte[]> map = value.getFamilyMap(Bytes.toBytes("datasource")); |
|
143 |
final byte[] body = map.get(Bytes.toBytes("body")); |
|
145 |
final byte[] body = value.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body")); |
|
144 | 146 |
if (body != null){ |
145 | 147 |
OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom(body); |
146 | 148 |
return oaf.getDataInfo().getTrust(); |
Also available in: Unified diff
trying to reduce memory footprint