Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dli;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.collect.Lists;
7
import com.google.protobuf.InvalidProtocolBufferException;
8
import com.googlecode.protobuf.format.JsonFormat;
9
import eu.dnetlib.data.mapreduce.hbase.dli.kv.DliKey;
10
import eu.dnetlib.data.proto.dli.ScholixObjectProtos.Scholix;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.io.Text;
15
import org.apache.hadoop.mapreduce.Reducer;
16

    
17
/**
18
 * Created by claudio on 13/03/2017.
19
 */
20
public class PrepareScholixDataReducer2 extends Reducer<DliKey, ImmutableBytesWritable, Text, Text> {
21

    
22
	private static final Log log = LogFactory.getLog(PrepareScholixDataReducer2.class); // NOPMD by marko on 11/24/08 5:02 PM
23

    
24
	private Text outKey;
25

    
26
	private Text outValue;
27

    
28
	@Override
29
	protected void setup(final Context context) throws IOException, InterruptedException {
30
		outKey = new Text("");
31
		outValue = new Text();
32
	}
33

    
34
	@Override
35
	protected void reduce(DliKey key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
36

    
37
		final List<Scholix> scholixes = Lists.newArrayList();
38
		values.forEach(i ->	scholixes.add(parse(key, i).build()));
39

    
40
		System.out.println("reducing key = " + key);
41

    
42
		if (!scholixes.isEmpty()) {
43

    
44
			final Scholix source = getSource(context, scholixes);
45

    
46
			if (source == null) {
47
				context.getCounter("scholix", "missing source").increment(1);
48
				System.out.println(String.format("missing source in group of %s: %s", scholixes.size(), key.getId()));
49
				System.out.println(scholixes);
50
				return;
51
			}
52

    
53
			scholixes.forEach(t -> {
54
				if (!t.hasSource()) {
55
					final Scholix.Builder rel = Scholix.newBuilder(t);
56
					if (rel.hasTarget()) {
57
						rel.setSource(source.getSource());
58
						emit(context, JsonFormat.printToString(rel.build()));
59
					} else {
60
						context.getCounter("scholix", "missing target").increment(1);
61
					}
62
				}
63
			});
64
		}
65
	}
66

    
67
	private Scholix getSource(final Context context, final List<Scholix> scholixes) {
68
		for (int i = 0; i < scholixes.size(); i++) {
69
			final Scholix s = scholixes.get(i);
70
			if (s.hasSource()) {
71
				context.getCounter("scholix", "first source in position " + i).increment(1);
72
				context.getCounter("scholix", "group size " + scholixes.size()).increment(1);
73
				return s;
74
			}
75
		}
76
		return null;
77
	}
78

    
79
	private Scholix.Builder parse(DliKey key, final ImmutableBytesWritable value) {
80
		try {
81
			return Scholix.newBuilder(Scholix.parseFrom(value.copyBytes()));
82
		} catch (InvalidProtocolBufferException e) {
83
			throw new IllegalArgumentException(String.format("cannot parse Scholix, keytype '%s', id '%s'", key.getKeyType(), key.getId()));
84
		}
85
	}
86

    
87
	private void emit(final Context context, final String data) {
88
		outValue.set(data.getBytes());
89
		try {
90
			context.write(outKey, outValue);
91
		} catch (Exception e) {
92
			e.printStackTrace();
93
			throw new RuntimeException(e);
94
		}
95
	}
96

    
97
}
(3-3/4)