Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4

    
5
import org.apache.hadoop.hbase.util.Bytes;
6
import org.apache.hadoop.io.Text;
7
import org.apache.hadoop.mapreduce.Reducer;
8

    
9
public class CalculatePersonDistributionStep1Reducer extends Reducer<Text, Text, Text, Text> {
10

    
11
	@Override
12
	protected void setup(final Context context) throws IOException, InterruptedException {
13
		super.setup(context);
14
	}
15

    
16
	@Override
17
	protected void reduce(final Text key, final Iterable<Text> values, final Context context)
18
			throws IOException, InterruptedException {
19

    
20
		final String dsID = Bytes.toString(key.getBytes());
21

    
22
		int count = 0;
23

    
24
		String type = null;
25
		for (Text i : values) {
26
			final byte[] b = i.copyBytes();
27

    
28
			if (count == 0) {
29
				if (CalculatePersonDistributionUtils.isDsType(b)) {
30
					type = CalculatePersonDistributionUtils.getType(b);
31

    
32
					if (type.equals("pubsrepository::institutional")) {
33
						context.getCounter("REDUCER: Valid Datasource Typologies (all)", type).increment(1);
34
					} else {
35
						context.getCounter("REDUCER: Skipped Datasource Typologies", type).increment(1);
36
						return;
37
					}
38
				} else {
39
					System.err.println("Invalid record in iterable (expected dsType): " + Bytes.toString(b));
40
					context.getCounter("REDUCER: Invalid record in iterable", "expected dsType").increment(1);
41
					//context.getCounter("REDUCER: TEST 1", Bytes.toString(key.copyBytes())).increment(1);
42
					return;
43
				}
44
			} else {
45
				if (CalculatePersonDistributionUtils.isPerson(b)) {
46
					context.write(
47
							CalculatePersonDistributionUtils.prepareHdfsKey(dsID, count),
48
							CalculatePersonDistributionUtils.prepareHdfsValue(dsID, CalculatePersonDistributionUtils.getPersonName(b)));
49
				} else {
50
					System.err.println("Invalid record in iterable (expected person): " + Bytes.toString(b));
51
					context.getCounter("REDUCER: Invalid record in iterable", "expected person").increment(1);
52
					return;
53
				}
54
				if (count == 1) {
55
					context.getCounter("REDUCER: Valid Datasource Typologies (with persons)", type).increment(1);
56
				}
57
			}
58
			count++;
59
		}
60

    
61
		if (type != null && count > 0) {
62
			context.getCounter("REDUCER: Person occurrences", type).increment(count - 1);
63
		}
64
	}
65
}
(2-2/5)