Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.gson.Gson;
4
import com.google.protobuf.InvalidProtocolBufferException;
5
import eu.dnetlib.data.mapreduce.util.DedupUtils;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
8
import eu.dnetlib.data.proto.OafProtos;
9
import eu.dnetlib.data.proto.TypeProtos;
10
import eu.dnetlib.dhp.schema.oaf.Oaf;
11
import org.apache.hadoop.hbase.client.Result;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableMapper;
14
import org.apache.hadoop.hbase.util.Bytes;
15
import org.apache.hadoop.io.Text;
16

    
17
import java.io.IOException;
18
import java.util.Map;
19
import java.util.Map.Entry;
20
import java.util.NavigableMap;
21

    
22
/**
23
 * Exports Oaf objects as their json serialization.
24
 *
25
 * @author claudio
26
 *
27
 */
28
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
29

    
30
	private Text keyOut;
31

    
32
	private Text valueOut;
33

    
34
	private Gson g;
35

    
36
	@Override
37
	protected void setup(final Context context) throws IOException, InterruptedException {
38
		super.setup(context);
39

    
40
		keyOut = new Text();
41
		valueOut = new Text();
42
		g = new Gson();
43
	}
44

    
45
	@Override
46
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
47
		try {
48
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
49

    
50
			final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType());
51

    
52
			if (oaf == null) {
53
				return;
54
			} else {
55
				emit(context, oaf);
56
			}
57

    
58
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
59

    
60
			for (byte[] cf : row.keySet()) {
61

    
62
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
63
					if(Bytes.toString(q.getKey()).startsWith("update") || q.getKey().equals(DedupUtils.BODY_S)) {
64
						continue;
65
					}
66
					if (new String(q.getValue()).equals("")) {
67
						context.getCounter("export", "skipped relation " + new String(cf)).increment(1);
68
					} else {
69
						emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
70
					}
71
				}
72
			}
73
		} catch (final Throwable e) {
74
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
75
			throw new RuntimeException(e);
76
		}
77
	}
78

    
79
	private void emit(Context context, OafProtos.Oaf oaf) throws IOException, InterruptedException {
80
		Oaf result = null;
81
		try {
82
			result = ProtoConverter.convert(oaf);
83
		} catch (Throwable e) {
84
			context.getCounter("export", "error:" + e.getClass().getName()).increment(1);
85
		}
86
		if (result != null) {
87
			keyOut.set(result.getClass().getName());
88
			valueOut.set(g.toJson(result));
89
			context.write(keyOut, valueOut);
90

    
91
			context.getCounter("export", result.getClass().getName()).increment(1);
92
		}
93
	}
94

    
95
	private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type)
96
			throws InvalidProtocolBufferException {
97
		try {
98
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
99
		} catch (final InvalidProtocolBufferException e) {
100
			throw e;
101
		}
102
	}
103

    
104
}
(4-4/10)