Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dli;
2

    
3
import java.io.IOException;
4
import java.util.Iterator;
5

    
6
import com.google.protobuf.InvalidProtocolBufferException;
7
import com.googlecode.protobuf.format.JsonFormat;
8
import eu.dnetlib.data.mapreduce.hbase.dli.key.DliKey;
9
import eu.dnetlib.data.proto.dli.ScholixObjectProtos.Scholix;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Reducer;
14

    
15
/**
16
 * Created by claudio on 13/03/2017.
17
 */
18
public class PrepareScholixDataReducer extends Reducer<DliKey, Text, Text, Text> {
19

    
20
	private static final Log log = LogFactory.getLog(PrepareScholixDataReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
21

    
22
	private Text outKey;
23

    
24
	private Text outValue;
25

    
26
	@Override
27
	protected void setup(final Context context) throws IOException, InterruptedException {
28
		outKey = new Text("");
29
		outValue = new Text();
30
	}
31

    
32
	@Override
33
	protected void reduce(DliKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
34

    
35
		final Iterator<Text> it = values.iterator();
36

    
37
		final Scholix source = parse(it.next()).build();
38

    
39
		if (source.hasSource() & !source.hasTarget()) {
40
			throw new IllegalStateException("sorting does not work! Expected entity as 1st element");
41
		}
42

    
43
		it.forEachRemaining(t -> {
44
			final Scholix.Builder rel = parse(t);
45
			rel.setSource(source.getSource());
46

    
47
			emit(context, JsonFormat.printToString(rel.build()));
48
		});
49
	}
50

    
51
	private Scholix.Builder parse(final Text t) {
52
		try {
53
			return Scholix.newBuilder(Scholix.parseFrom(t.getBytes()));
54
		} catch (InvalidProtocolBufferException e) {
55
			throw new IllegalArgumentException("cannot parse Scholix: " + t.toString());
56
		}
57
	}
58

    
59
	private void emit(final Context context, final String data) {
60
		outValue.set(data.getBytes());
61
		try {
62
			context.write(outKey, outValue);
63
		} catch (Exception e) {
64
			e.printStackTrace();
65
			throw new RuntimeException(e);
66
		}
67
	}
68

    
69
}
(2-2/3)