Project

General

Profile

« Previous | Next » 

Revision 48145

integrated latest changes from dnet40

View differences:

EnrichmentMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.HashSet;
6 4
import java.util.Map;
7
import java.util.Map.Entry;
8
import java.util.Set;
9 5

  
10
import com.google.common.base.Function;
11
import com.google.common.base.Predicate;
12 6
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15 7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
16 9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
17
import eu.dnetlib.data.proto.DedupProtos.Dedup;
18 10
import eu.dnetlib.data.proto.DedupProtos.Dedup.RelName;
19 11
import eu.dnetlib.data.proto.OafProtos.Oaf;
20 12
import eu.dnetlib.data.proto.TypeProtos.Type;
21 13
import org.apache.commons.collections.MapUtils;
22 14
import org.apache.hadoop.hbase.client.Result;
23 15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24
import org.apache.hadoop.hbase.mapreduce.TableMapper;
25 16
import org.apache.hadoop.hbase.util.Bytes;
26 17

  
27 18
/**
28 19
 * Created by claudio on 08/07/16.
29 20
 */
30
public class EnrichmentMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
21
public class EnrichmentMapper extends AbstractEnrichmentMapper {
31 22

  
32
	private ImmutableBytesWritable outValue;
33

  
34
	private ImmutableBytesWritable outKey;
35

  
36 23
	@Override
37
	protected void setup(final Context context) {
38
		outKey = new ImmutableBytesWritable();
39
		outValue = new ImmutableBytesWritable();
24
	protected String counterGroup() {
25
		return "Broker Enrichment";
40 26
	}
41 27

  
42 28
	@Override
43 29
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
44 30

  
31
		final String type = OafRowKeyDecoder.decode(key.copyBytes()).getType().toString();
32

  
45 33
		final Map<byte[], byte[]> result = value.getFamilyMap(Bytes.toBytes(Type.result.name()));
46 34
		if (MapUtils.isEmpty(result)) {
47
			context.getCounter(Type.result.name(), "empty family map").increment(1);
35
			context.getCounter(counterGroup(), type + ": empty family map").increment(1);
48 36
			return;
49 37
		}
50 38

  
51 39
		final Oaf body = UpdateMerger.mergeBodyUpdates(context, result);
52 40

  
53 41
		if (body == null) {
54
			context.getCounter(Type.result.name(), "body null").increment(1);
42
			context.getCounter(counterGroup(), type + ": body null").increment(1);
55 43
			return;
56 44
		}
57 45

  
58 46
		final String mergedInCF = DedupUtils.getDedupCF_mergedIn(Type.result);
59 47
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(Bytes.toBytes(mergedInCF));
60 48

  
61
		final String outKey = getEmitKey(context, key, mergedIn);
49
		final byte[] outKey = getEmitKey(context, key, mergedIn);
62 50

  
63
		emit(context, outKey, body);
51
		emit(context, outKey, body.toByteArray(), type);
64 52
	}
65 53

  
66
	private String getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) {
54
	private byte[] getEmitKey(final Context context, final ImmutableBytesWritable key, final Map<byte[], byte[]> mergedIn) {
67 55
		if (MapUtils.isNotEmpty(mergedIn)) {
68 56
			context.getCounter(Type.result.name(), RelName.isMergedIn.name()).increment(1);
69
			return getRootId(mergedIn, context);
57
			return Iterables.getOnlyElement(mergedIn.keySet());
70 58
		} else {
71
			return new String(key.copyBytes());
59
			return key.copyBytes();
72 60
		}
73 61
	}
74 62

  
75
	private void emit(final Context context, final String key, final Oaf body) throws IOException, InterruptedException {
76
		outKey.set(Bytes.toBytes(key));
77
		outValue.set(body.toByteArray());
78

  
79
		context.write(outKey, outValue);
80
	}
81

  
82
	private String getRootId(final Map<byte[], byte[]> mergedIn, Context context) {
83
		final HashSet<String> ids = Sets.newHashSet(Iterables.transform(mergedIn.keySet(), new Function<byte[], String>() {
84

  
85
			@Override
86
			public String apply(final byte[] input) {
87
				return new String(input);
88
			}
89
		}));
90

  
91
		//context.getCounter("duplicate group size", String.valueOf(ids.size())).increment(1);
92

  
93
		try {
94
			return Iterables.getOnlyElement(ids);
95
		} catch(IllegalArgumentException e) {
96
			System.err.println(ids);
97
			throw e;
98
		}
99
	}
100

  
101 63
}

Also available in: Unified diff