Project

General

Profile

1 43383 claudio.at
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2 43174 claudio.at
3
import java.io.IOException;
4
import java.util.Map;
5
6
import com.google.common.collect.Iterables;
7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8 48145 claudio.at
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9 43174 claudio.at
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10 44584 claudio.at
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
11 43174 claudio.at
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.TypeProtos.Type;
13
import org.apache.commons.collections.MapUtils;
14
import org.apache.hadoop.hbase.client.Result;
15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17
18
/**
19
 * Created by claudio on 08/07/16.
20
 */
21 48145 claudio.at
public class EnrichmentMapper extends AbstractEnrichmentMapper {
22 43174 claudio.at
23
	@Override
24 48145 claudio.at
	protected String counterGroup() {
25
		return "Broker Enrichment";
26 43174 claudio.at
	}
27
28
	@Override
29
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
30
31 48145 claudio.at
		final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString();
32
33 44584 claudio.at
		final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name()));
34
		if (MapUtils.isEmpty(result)) {
35 48145 claudio.at
			context.getCounter(counterGroup(), type + ": empty family map").increment(1);
36 43174 claudio.at
			return;
37
		}
38
39 44584 claudio.at
		final Oaf body = UpdateMerger.mergeBodyUpdates(context, result);
40 43174 claudio.at
41 44584 claudio.at
		if (body == null) {
42 48145 claudio.at
			context.getCounter(counterGroup(), type + ": body null").increment(1);
43 43174 claudio.at
			return;
44
		}
45
46 44584 claudio.at
		final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result);
47
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF));
48 43174 claudio.at
49 48145 claudio.at
		final byte[] outKey = getEmitKey(context, key, mergedIn);
50 44584 claudio.at
51 48145 claudio.at
		emit(context, outKey, body.toByteArray(), type);
52 44584 claudio.at
	}
53
54 48145 claudio.at
	private byte[] getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) {
55 44584 claudio.at
		if (MapUtils.isNotEmpty(mergedIn)) {
56
			context.getCounter(Type.result.name(), RelName.isMergedIn.name()).increment(1);
57 48145 claudio.at
			return Iterables.getOnlyElement(mergedIn.keySet());
58 44584 claudio.at
		} else {
59 48145 claudio.at
			return key.copyBytes();
60 43331 claudio.at
		}
61 44584 claudio.at
	}
62 43331 claudio.at
63 43174 claudio.at
}