Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.experiment;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6

    
7
import com.google.common.base.Function;
8
import com.google.common.base.Joiner;
9
import com.google.common.collect.Iterables;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
13
import eu.dnetlib.data.proto.PersonProtos;
14
import org.apache.commons.lang.StringUtils;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.io.NullWritable;
19

    
20
/**
21
 * builds map {merged author -> anchorId}
22
 *
23
 * @author claudio
24
 *
25
 */
26
public class AnchorStatsMapper extends TableMapper<NullWritable, NullWritable> {
27

    
28
	@Override
29
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
30

    
31

    
32
		final byte[] body = value.getValue("person".getBytes(), DedupUtils.BODY_B);
33

    
34
		if (body != null) {
35
			try {
36
				final OafDecoder decoder = OafDecoder.decode(body);
37

    
38
				final PersonProtos.Person p = decoder.getEntity().getPerson();
39

    
40
				if (!p.getAnchor()) {
41
					context.getCounter("person", "not anchor").increment(1);
42
					return;
43
				}
44

    
45
				trackPersonInfo(p.getMergedpersonCount(), context, "person merged");
46
				trackPersonInfo(p.getCoauthorCount(), context, "person coauthors");
47

    
48
			} catch (final Throwable e) {
49
				System.out.println("GOT EX " + e);
50
				//e.printStackTrace(System.err);
51
				context.getCounter("error", e.getClass().toString()).increment(1);
52
			}
53
		} else {
54
			context.getCounter("person", "missing body").increment(1);
55
		}
56
	}
57

    
58
	private void trackPersonInfo(final int count, final Context context, final String counterName) {
59

    
60
		if (count > 0 && count <= 10) {
61
			context.getCounter(counterName, count + "").increment(1);
62
			return;
63
		}
64

    
65
		if (count > 10 && count <= 20) {
66
			context.getCounter(counterName, "[10, 20)").increment(1);
67
			return;
68
		}
69

    
70
		if (count > 20 && count <= 30) {
71
			context.getCounter(counterName, "[20, 30)").increment(1);
72
			return;
73
		}
74

    
75
		if (count > 30 && count <= 40) {
76
			context.getCounter(counterName, "[30, 40)").increment(1);
77
			return;
78
		}
79

    
80
		if (count > 40 && count <= 50) {
81
			context.getCounter(counterName, "[40, 50)").increment(1);
82
			return;
83
		}
84

    
85
		if (count > 50 && count <= 70) {
86
			context.getCounter(counterName, "[50, 70)").increment(1);
87
			return;
88
		}
89

    
90
		if (count > 70 && count <= 100) {
91
			context.getCounter(counterName, "[70, 100)").increment(1);
92
			return;
93
		}
94

    
95
		if (count > 100) {
96
			context.getCounter(counterName, "[100, *)").increment(1);
97
			return;
98
		}
99

    
100
	}
101

    
102
}
(1-1/9)