Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.stream.Collectors;
7

    
8
import com.google.common.collect.Iterables;
9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
11
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
12
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.commons.collections.MapUtils;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18
import org.apache.hadoop.hbase.util.Bytes;
19

    
20
/**
21
 * Created by claudio on 08/07/16.
22
 */
23
public class EnrichmentMapper extends AbstractEnrichmentMapper {
24

    
25
	@Override
26
	protected String counterGroup() {
27
		return "Broker Enrichment";
28
	}
29

    
30
	@Override
31
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
32

    
33
		final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString();
34

    
35
		final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name()));
36
		if (MapUtils.isEmpty(result)) {
37
			context.getCounter(counterGroup(), type + ": empty family map").increment(1);
38
			return;
39
		}
40

    
41
		final Oaf body = UpdateMerger.mergeBodyUpdates(context, result);
42

    
43
		if (body == null) {
44
			context.getCounter(counterGroup(), type + ": body null").increment(1);
45
			return;
46
		}
47

    
48
		final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result);
49
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF));
50

    
51
		final byte[] outKey = getEmitKey(context, key, mergedIn);
52

    
53
		emit(context, outKey, body.toByteArray(), type);
54
	}
55

    
56
}
(3-3/8)