Project

General

Profile

1 46234 claudio.at
package eu.dnetlib.data.mapreduce.hbase.dli;
2
3
import java.io.IOException;
4 46317 claudio.at
import java.util.Arrays;
5 46234 claudio.at
import java.util.Iterator;
6 46317 claudio.at
import java.util.LinkedList;
7 46311 claudio.at
import java.util.concurrent.atomic.AtomicInteger;
8 46234 claudio.at
9 46317 claudio.at
import com.google.common.base.Splitter;
10
import com.google.common.collect.Lists;
11 46250 claudio.at
import com.google.protobuf.InvalidProtocolBufferException;
12
import com.googlecode.protobuf.format.JsonFormat;
13 46275 claudio.at
import eu.dnetlib.data.mapreduce.hbase.dli.kv.DliKey;
14 46250 claudio.at
import eu.dnetlib.data.proto.dli.ScholixObjectProtos.Scholix;
15 46234 claudio.at
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17 46294 claudio.at
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
18 46234 claudio.at
import org.apache.hadoop.io.Text;
19
import org.apache.hadoop.mapreduce.Reducer;
20
21
/**
22
 * Created by claudio on 13/03/2017.
23
 */
24 46294 claudio.at
public class PrepareScholixDataReducer extends Reducer<DliKey, ImmutableBytesWritable, Text, Text> {
25 46234 claudio.at
26 46250 claudio.at
	private static final Log log = LogFactory.getLog(PrepareScholixDataReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27 46234 claudio.at
28 46250 claudio.at
	private Text outKey;
29
30
	private Text outValue;
31
32 46234 claudio.at
	@Override
33 46250 claudio.at
	protected void setup(final Context context) throws IOException, InterruptedException {
34
		outKey = new Text("");
35
		outValue = new Text();
36
	}
37 46234 claudio.at
38 46250 claudio.at
	@Override
39 46294 claudio.at
	protected void reduce(DliKey key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
40 46250 claudio.at
41 46311 claudio.at
		//System.out.println("reducing key = " + key);
42
43 46294 claudio.at
		final Iterator<ImmutableBytesWritable> it = values.iterator();
44
		final ImmutableBytesWritable first = it.next();
45 46275 claudio.at
		final Scholix source = parse(key, first).build();
46 46311 claudio.at
		final AtomicInteger groupSize = new AtomicInteger(1);
47 46270 claudio.at
		if (!source.hasSource()) {
48
			context.getCounter("scholix", "missing source").increment(1);
49
			return;
50 46234 claudio.at
		}
51
52
		it.forEachRemaining(t -> {
53 46311 claudio.at
			groupSize.incrementAndGet();
54 46275 claudio.at
			final Scholix.Builder rel = parse(key, t);
55 46270 claudio.at
			if (rel.hasTarget()) {
56
				rel.setSource(source.getSource());
57
				emit(context, JsonFormat.printToString(rel.build()));
58
			} else {
59
				context.getCounter("scholix", "missing target").increment(1);
60
			}
61 46234 claudio.at
		});
62 46311 claudio.at
63 46317 claudio.at
		groupSizeCounter(context, groupSize.get(),
64
				"1,1",
65
				"1,10",
66
				"10,20",
67
				"20,100",
68
				"100,200",
69
				"200,500",
70
				"500,1000",
71
				"1000,2000",
72
				"2000,5000",
73
				"5000,10000",
74
				"10000,20000",
75
				"20000,*");
76 46234 claudio.at
	}
77
78 46317 claudio.at
	private void groupSizeCounter(final Context context, final int groupSize, final String... groups) {
79
		Arrays.asList(groups).forEach(g -> {
80
			final LinkedList<String> i = Lists.newLinkedList(Splitter.on(",").split(g));
81
			int min = Integer.parseInt(i.getFirst());
82
			int max = i.getLast().equals("*") ? Integer.MAX_VALUE : Integer.parseInt(i.getLast());
83
			groupSizeCounter(context, groupSize, min, max);
84
		});
85
86
	}
87
88
	private void groupSizeCounter(final Context context, final int groupSize, Integer min, Integer max) {
89
		if (groupSize > min & groupSize <= max) {
90
			context.getCounter("scholix groups", String.format("group size (%s,%s)", min, max)).increment(1);
91
		}
92
	}
93
94 46294 claudio.at
	private Scholix.Builder parse(DliKey key, final ImmutableBytesWritable value) {
95 46250 claudio.at
		try {
96 46294 claudio.at
			return Scholix.newBuilder(Scholix.parseFrom(value.copyBytes()));
97 46250 claudio.at
		} catch (InvalidProtocolBufferException e) {
98 46275 claudio.at
			throw new IllegalArgumentException(String.format("cannot parse Scholix, keytype '%s', id '%s'", key.getKeyType(), key.getId()));
99 46250 claudio.at
		}
100 46234 claudio.at
	}
101
102 46250 claudio.at
	private void emit(final Context context, final String data) {
103
		outValue.set(data.getBytes());
104
		try {
105
			context.write(outKey, outValue);
106
		} catch (Exception e) {
107
			e.printStackTrace();
108
			throw new RuntimeException(e);
109
		}
110
	}
111 46234 claudio.at
112
}