Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import com.google.common.collect.Iterables;
7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.TypeProtos.Type;
13
import org.apache.commons.collections.MapUtils;
14
import org.apache.hadoop.hbase.client.Result;
15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17

    
18
/**
19
 * Created by claudio on 08/07/16.
20
 */
21
public class EnrichmentMapper extends AbstractEnrichmentMapper {
22

    
23
	@Override
24
	protected String counterGroup() {
25
		return "Broker Enrichment";
26
	}
27

    
28
	@Override
29
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
30

    
31
		final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString();
32

    
33
		final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name()));
34
		if (MapUtils.isEmpty(result)) {
35
			context.getCounter(counterGroup(), type + ": empty family map").increment(1);
36
			return;
37
		}
38

    
39
		final Oaf body = UpdateMerger.mergeBodyUpdates(context, result);
40

    
41
		if (body == null) {
42
			context.getCounter(counterGroup(), type + ": body null").increment(1);
43
			return;
44
		}
45

    
46
		final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result);
47
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF));
48

    
49
		final byte[] outKey = getEmitKey(context, key, mergedIn);
50

    
51
		emit(context, outKey, body.toByteArray(), type);
52
	}
53

    
54
	private byte[] getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) {
55
		if (MapUtils.isNotEmpty(mergedIn)) {
56
			context.getCounter(Type.result.name(), RelName.isMergedIn.name()).increment(1);
57
			return Iterables.getOnlyElement(mergedIn.keySet());
58
		} else {
59
			return key.copyBytes();
60
		}
61
	}
62

    
63
}
(3-3/8)