Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4

    
5
import com.googlecode.protobuf.format.JsonFormat;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.KindProtos.Kind;
8
import eu.dnetlib.data.proto.OafProtos.Oaf;
9
import eu.dnetlib.data.proto.TypeProtos.Type;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Reducer;
13

    
14
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.asOaf;
15

    
16
public class PrepareBrokerDataReducer extends Reducer<Text, ImmutableBytesWritable, Text, Text> {
17

    
18
	private Text outKey;
19
	private Text outValue;
20

    
21
	@Override
22
	protected void setup(final Context context) throws IOException, InterruptedException {
23
		outKey = new Text("");
24
		outValue = new Text();
25
	}
26

    
27
	@Override
28
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
29
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(key.toString());
30

    
31
		if (!keyDecoder.getType().equals(Type.result)) {
32
			return;
33
		}
34

    
35
		try {
36
			boolean entity = false;
37
			final Oaf.Builder out = Oaf.newBuilder();
38
			for(Oaf oaf : asOaf(values)) {
39
				if (Kind.entity.equals(oaf.getKind())) {
40
					if (entity == true) {
41
						context.getCounter("result", "skipped group > 1").increment(1);
42
						return;
43
					}
44
					final String resulttypeid = oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
45
					if (!"publication".equals(resulttypeid)) {
46
						context.getCounter("result", "skipped " + resulttypeid).increment(1);
47
						return;
48
					}
49
					out.mergeFrom(oaf);
50
					entity = true;
51
				} else {
52
					out.getEntityBuilder().addCachedRel(oaf.getRel());
53
				}
54
			}
55

    
56
			outValue.set(JsonFormat.printToString(out.build()));
57
			context.write(outKey, outValue);
58

    
59
		} catch (final Exception e) {
60
			context.getCounter("error", e.getClass().getName()).increment(1);
61
			throw new RuntimeException(e);
62
		}
63
	}
64

    
65
}
(5-5/10)