1 |
43383
|
claudio.at
|
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
|
2 |
43174
|
claudio.at
|
|
3 |
|
|
import java.io.IOException;
|
4 |
|
|
import java.util.Map;
|
5 |
|
|
|
6 |
|
|
import com.google.common.collect.Iterables;
|
7 |
|
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
8 |
48145
|
claudio.at
|
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
9 |
43174
|
claudio.at
|
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
|
10 |
44584
|
claudio.at
|
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
|
11 |
43174
|
claudio.at
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
12 |
|
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
13 |
|
|
import org.apache.commons.collections.MapUtils;
|
14 |
|
|
import org.apache.hadoop.hbase.client.Result;
|
15 |
|
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
16 |
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
17 |
|
|
|
18 |
|
|
/**
|
19 |
|
|
* Created by claudio on 08/07/16.
|
20 |
|
|
*/
|
21 |
48145
|
claudio.at
|
public class EnrichmentMapper extends AbstractEnrichmentMapper {
|
22 |
43174
|
claudio.at
|
|
23 |
|
|
@Override
|
24 |
48145
|
claudio.at
|
protected String counterGroup() {
|
25 |
|
|
return "Broker Enrichment";
|
26 |
43174
|
claudio.at
|
}
|
27 |
|
|
|
28 |
|
|
@Override
|
29 |
|
|
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
|
30 |
|
|
|
31 |
48145
|
claudio.at
|
final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString();
|
32 |
|
|
|
33 |
44584
|
claudio.at
|
final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name()));
|
34 |
|
|
if (MapUtils.isEmpty(result)) {
|
35 |
48145
|
claudio.at
|
context.getCounter(counterGroup(), type + ": empty family map").increment(1);
|
36 |
43174
|
claudio.at
|
return;
|
37 |
|
|
}
|
38 |
|
|
|
39 |
44584
|
claudio.at
|
final Oaf body = UpdateMerger.mergeBodyUpdates(context, result);
|
40 |
43174
|
claudio.at
|
|
41 |
44584
|
claudio.at
|
if (body == null) {
|
42 |
48145
|
claudio.at
|
context.getCounter(counterGroup(), type + ": body null").increment(1);
|
43 |
43174
|
claudio.at
|
return;
|
44 |
|
|
}
|
45 |
|
|
|
46 |
44584
|
claudio.at
|
final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result);
|
47 |
|
|
final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF));
|
48 |
43174
|
claudio.at
|
|
49 |
48145
|
claudio.at
|
final byte[] outKey = getEmitKey(context, key, mergedIn);
|
50 |
44584
|
claudio.at
|
|
51 |
48145
|
claudio.at
|
emit(context, outKey, body.toByteArray(), type);
|
52 |
44584
|
claudio.at
|
}
|
53 |
|
|
|
54 |
48145
|
claudio.at
|
private byte[] getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) {
|
55 |
44584
|
claudio.at
|
if (MapUtils.isNotEmpty(mergedIn)) {
|
56 |
|
|
context.getCounter(Type.result.name(), RelName.isMergedIn.name()).increment(1);
|
57 |
48145
|
claudio.at
|
return Iterables.getOnlyElement(mergedIn.keySet());
|
58 |
44584
|
claudio.at
|
} else {
|
59 |
48145
|
claudio.at
|
return key.copyBytes();
|
60 |
43331
|
claudio.at
|
}
|
61 |
44584
|
claudio.at
|
}
|
62 |
43331
|
claudio.at
|
|
63 |
43174
|
claudio.at
|
}
|