Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import org.apache.hadoop.hbase.client.Delete;
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.mapreduce.Counter;
12

    
13
import com.google.protobuf.InvalidProtocolBufferException;
14

    
15
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
17
import eu.dnetlib.data.proto.KindProtos.Kind;
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.proto.OafProtos.OafEntity;
20
import eu.dnetlib.data.proto.PersonProtos.Person;
21
import eu.dnetlib.data.proto.TypeProtos.Type;
22

    
23
/**
24
 * Removes the Non-root rows
25
 *
26
 *
27
 * @author claudio
28
 *
29
 */
30
public class GTCleanerMapper extends TableMapper<ImmutableBytesWritable, Delete> {
31

    
32
	@Override
33
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
34

    
35
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
36

    
37
		final Type type = keyDecoder.getType();
38

    
39
		if (!type.equals(Type.person)) {
40
			incrementCounter(context, "wrong entity type", type.toString(), 1);
41
			return;
42
		}
43

    
44
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
45
		final byte[] bodyB = map.get(DedupUtils.BODY_B);
46
		if (bodyB == null) {
47
			incrementCounter(context, "missing body (map)", type.toString(), 1);
48
			return;
49
		}
50

    
51
		final Oaf oaf = decodeProto(context, bodyB);
52

    
53
		if (!isValid(oaf)) {
54
			incrementCounter(context, "missing body (map)", type.toString(), 1);
55
			return;
56
		}
57

    
58
		if (mergedSize(oaf, 0) || mergedSize(oaf, 1)) {
59
			context.write(keyIn, new Delete(keyIn.copyBytes()));
60
			incrementCounter(context, Kind.entity.toString(), "deleted", 1);
61
		}
62

    
63
	}
64

    
65
	private boolean mergedSize(final Oaf oaf, final int size) {
66
		final OafEntity entity = oaf.getEntity();
67

    
68
		if (entity == null) return false;
69

    
70
		final Person person = entity.getPerson();
71

    
72
		return (person.getMergedpersonList() != null) && (person.getMergedpersonList().size() == size);
73
	}
74

    
75
	private boolean isValid(final Oaf oaf) {
76
		return (oaf != null) && oaf.isInitialized();
77
	}
78

    
79
	private Oaf decodeProto(final Context context, final byte[] body) {
80
		try {
81
			return Oaf.parseFrom(body);
82
		} catch (final InvalidProtocolBufferException e) {
83
			e.printStackTrace(System.err);
84
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
85
		}
86
		return null;
87
	}
88

    
89
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
90
		getCounter(context, k, t).increment(n);
91
	}
92

    
93
	private Counter getCounter(final Context context, final String k, final String t) {
94
		return context.getCounter(k, t);
95
	}
96

    
97
}
(4-4/6)