Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dli;
2

    
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.Iterator;
6
import java.util.LinkedList;
7
import java.util.concurrent.atomic.AtomicInteger;
8

    
9
import com.google.common.base.Splitter;
10
import com.google.common.collect.Lists;
11
import com.google.protobuf.InvalidProtocolBufferException;
12
import com.googlecode.protobuf.format.JsonFormat;
13
import eu.dnetlib.data.mapreduce.hbase.dli.kv.DliKey;
14
import eu.dnetlib.data.proto.dli.ScholixObjectProtos.Scholix;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18
import org.apache.hadoop.io.Text;
19
import org.apache.hadoop.mapreduce.Reducer;
20

    
21
/**
22
 * Created by claudio on 13/03/2017.
23
 */
24
public class PrepareScholixDataReducer extends Reducer<DliKey, ImmutableBytesWritable, Text, Text> {
25

    
26
	private static final Log log = LogFactory.getLog(PrepareScholixDataReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27

    
28
	private Text outKey;
29

    
30
	private Text outValue;
31

    
32
	@Override
33
	protected void setup(final Context context) throws IOException, InterruptedException {
34
		outKey = new Text("");
35
		outValue = new Text();
36
	}
37

    
38
	@Override
39
	protected void reduce(DliKey key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
40

    
41
		//System.out.println("reducing key = " + key);
42

    
43
		final Iterator<ImmutableBytesWritable> it = values.iterator();
44
		final ImmutableBytesWritable first = it.next();
45
		final Scholix source = parse(key, first).build();
46
		final AtomicInteger groupSize = new AtomicInteger(1);
47
		if (!source.hasSource()) {
48
			context.getCounter("scholix", "missing source").increment(1);
49
			return;
50
		}
51

    
52
		it.forEachRemaining(t -> {
53
			groupSize.incrementAndGet();
54
			final Scholix.Builder rel = parse(key, t);
55
			if (rel.hasTarget()) {
56
				rel.setSource(source.getSource());
57
				emit(context, JsonFormat.printToString(rel.build()));
58
			} else {
59
				context.getCounter("scholix", "missing target").increment(1);
60
			}
61
		});
62

    
63
		groupSizeCounter(context, groupSize.get(),
64
				"1,1",
65
				"1,10",
66
				"10,20",
67
				"20,100",
68
				"100,200",
69
				"200,500",
70
				"500,1000",
71
				"1000,2000",
72
				"2000,5000",
73
				"5000,10000",
74
				"10000,20000",
75
				"20000,*");
76
	}
77

    
78
	private void groupSizeCounter(final Context context, final int groupSize, final String... groups) {
79
		Arrays.asList(groups).forEach(g -> {
80
			final LinkedList<String> i = Lists.newLinkedList(Splitter.on(",").split(g));
81
			int min = Integer.parseInt(i.getFirst());
82
			int max = i.getLast().equals("*") ? Integer.MAX_VALUE : Integer.parseInt(i.getLast());
83
			groupSizeCounter(context, groupSize, min, max);
84
		});
85

    
86
	}
87

    
88
	private void groupSizeCounter(final Context context, final int groupSize, Integer min, Integer max) {
89
		if (groupSize > min & groupSize <= max) {
90
			context.getCounter("scholix groups", String.format("group size (%s,%s)", min, max)).increment(1);
91
		}
92
	}
93

    
94
	private Scholix.Builder parse(DliKey key, final ImmutableBytesWritable value) {
95
		try {
96
			return Scholix.newBuilder(Scholix.parseFrom(value.copyBytes()));
97
		} catch (InvalidProtocolBufferException e) {
98
			throw new IllegalArgumentException(String.format("cannot parse Scholix, keytype '%s', id '%s'", key.getKeyType(), key.getId()));
99
		}
100
	}
101

    
102
	private void emit(final Context context, final String data) {
103
		outValue.set(data.getBytes());
104
		try {
105
			context.write(outKey, outValue);
106
		} catch (Exception e) {
107
			e.printStackTrace();
108
			throw new RuntimeException(e);
109
		}
110
	}
111

    
112
}
(2-2/4)