Project

General

Profile

1
package eu.dnetlib.data.mapreduce.es;
2

    
3
import java.io.IOException;
4

    
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
import org.apache.hadoop.hbase.util.Bytes;
9
import org.apache.hadoop.io.BytesWritable;
10
import org.apache.hadoop.io.NullWritable;
11

    
12
import com.google.protobuf.InvalidProtocolBufferException;
13
import com.googlecode.protobuf.format.JsonFormat;
14

    
15
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
16
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.data.proto.TypeProtos.Type;
19

    
20
public class ElasticsearchFeedMapper extends TableMapper<NullWritable, BytesWritable> {
21

    
22
	@Override
23
	protected void setup(final Context context) throws IOException, InterruptedException {
24

    
25
	}
26

    
27
	@Override
28
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
29

    
30
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
31

    
32
		final Oaf oaf = mergeUpdates(value, context, keyDecoder.getType(), keyDecoder);
33

    
34
		if (isValid(oaf)) {
35

    
36
			context.write(NullWritable.get(), new BytesWritable(Bytes.toBytes(new JsonFormat().printToString(oaf))));
37
		}
38
	}
39

    
40
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
41
			throws InvalidProtocolBufferException {
42
		try {
43
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
44
		} catch (final InvalidProtocolBufferException e) {
45
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
46
			throw e;
47
		}
48
	}
49

    
50
	private boolean isValid(final Oaf oaf) {
51
		return (oaf != null) && oaf.isInitialized();
52
	}
53

    
54
}
    (1-1/1)