Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.gson.Gson;
4
import com.google.protobuf.InvalidProtocolBufferException;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.data.proto.TypeProtos;
9
import eu.dnetlib.dhp.schema.oaf.Oaf;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.io.Text;
15

    
16
import java.io.IOException;
17
import java.util.Map;
18
import java.util.Map.Entry;
19
import java.util.NavigableMap;
20

    
21
/**
22
 * Exports Oaf objects as their json serialization.
23
 *
24
 * @author claudio
25
 *
26
 */
27
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
28

    
29
	private Text keyOut;
30

    
31
	private Text valueOut;
32

    
33
	private Gson g;
34

    
35
	@Override
36
	protected void setup(final Context context) throws IOException, InterruptedException {
37
		super.setup(context);
38

    
39
		keyOut = new Text();
40
		valueOut = new Text();
41
		g = new Gson();
42
	}
43

    
44
	@Override
45
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
46
		try {
47
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
48

    
49
			final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType());
50

    
51
			if (oaf == null) {
52
				return;
53
			}
54

    
55
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
56

    
57
			for (byte[] cf : row.keySet()) {
58

    
59
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
60

    
61
					if(Bytes.toString(q.getKey()).startsWith("update")) {
62
						continue;
63
					}
64

    
65
					Oaf result = null;
66
					try {
67
						result = ProtoConverter.convert(oaf);
68
					} catch (Throwable e) {
69
						context.getCounter("Convert", "error" ).increment(1);
70
					}
71
					if (result != null) {
72
						emit(result.getClass().getName(), result, context);
73
					}
74
				}
75
			}
76
		} catch (final Throwable e) {
77
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
78
			throw new RuntimeException(e);
79
		}
80
	}
81
	private void emit(final String type,  final Oaf result, final Context context) throws IOException, InterruptedException {
82
		keyOut.set(type);
83
		valueOut.set(g.toJson(result));
84
		context.write(keyOut, valueOut);
85

    
86
		context.getCounter("export", type).increment(1);
87
	}
88

    
89
	private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type)
90
			throws InvalidProtocolBufferException {
91
		try {
92
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
93
		} catch (final InvalidProtocolBufferException e) {
94
			throw e;
95
		}
96
	}
97

    
98
}
(4-4/10)