Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import eu.dnetlib.data.mapreduce.util.OafDecoder;
4
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.proto.KindProtos;
7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.data.proto.TypeProtos;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.NullWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Counter;
13
import org.apache.hadoop.mapreduce.Reducer;
14

    
15
public class InfospaceCountsReducer extends Reducer<Text, ImmutableBytesWritable, NullWritable, NullWritable> {
16

    
17
	public static final String ENTITY = KindProtos.Kind.entity.toString();
18

    
19
	@Override
20
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
21
		try {
22
			final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(key.toString());
23
			for (final ImmutableBytesWritable bytes : values) {
24
				final OafDecoder decoder = OafHbaseUtils.decode(bytes);
25
				final TypeProtos.Type type = keyDecoder.getType();
26

    
27
				final OafProtos.Oaf oaf = decoder.getOaf();
28

    
29
				switch (decoder.getKind()) {
30
					case entity:
31
						if (deletedByInference(oaf)) {
32
							if (isInvisible(oaf)) {
33
								incrementCounter(context, ENTITY, String.format("%s (deleted true / invisible true)", getEntityType(oaf, type)), 1);
34
							} else {
35
								incrementCounter(context, ENTITY, String.format("%s (deleted true / invisible false)", getEntityType(oaf, type)), 1);
36
							}
37
						} else {
38

    
39
							if (isInvisible(oaf)) {
40
								incrementCounter(context, ENTITY, String.format("%s (deleted false / invisible true)", getEntityType(oaf, type)), 1);
41
							} else {
42
								incrementCounter(context, ENTITY, String.format("%s (deleted false / invisible false)", getEntityType(oaf, type)), 1);
43
							}
44
						}
45
						break;
46
					case relation:
47
						if (deletedByInference(oaf)) {
48
							incrementCounter(context, String.format("%s (deleted true)", ENTITY), decoder.getCFQ(), 1);
49
						} else {
50
							incrementCounter(context, String.format("%s (deleted false)", ENTITY), decoder.getCFQ(), 1);
51
						}
52
						break;
53
					default:
54
						throw new IllegalArgumentException("unknow type: " + decoder.getKind());
55
				}
56
			}
57
		} catch (final Throwable e) {
58
			context.getCounter("error", e.getClass().getName()).increment(1);
59
			throw new RuntimeException(e);
60
		}
61
	}
62

    
63
	private void incrementCounter(final Reducer.Context context, final String k, final String t, final int n) {
64
		getCounter(context, k, t).increment(n);
65
	}
66

    
67
	private Counter getCounter(final Reducer.Context context, final String k, final String t) {
68
		return context.getCounter(k, t);
69
	}
70

    
71
	private String getEntityType(final OafProtos.Oaf oaf, final TypeProtos.Type type) {
72
		switch (type) {
73
			case result:
74
				return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
75
			default:
76
				return type.toString();
77
		}
78
	}
79

    
80
	private boolean deletedByInference(final OafProtos.Oaf oaf) {
81
		return oaf.getDataInfo().getDeletedbyinference();
82
	}
83

    
84
	private boolean isInvisible(final OafProtos.Oaf oaf) {
85
		return oaf.getDataInfo().getInvisible();
86
	}
87

    
88
}
(6-6/8)