Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.gson.Gson;
4
import com.google.protobuf.InvalidProtocolBufferException;
5
import eu.dnetlib.data.mapreduce.util.DedupUtils;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
8
import eu.dnetlib.data.proto.OafProtos;
9
import eu.dnetlib.data.proto.TypeProtos;
10
import eu.dnetlib.dhp.schema.oaf.Oaf;
11
import eu.dnetlib.dhp.schema.oaf.Relation;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Text;
17
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
18

    
19
import java.io.IOException;
20
import java.util.Map;
21
import java.util.Map.Entry;
22
import java.util.NavigableMap;
23

    
24
/**
25
 * Exports Oaf objects as their json serialization.
26
 *
27
 * @author claudio
28
 *
29
 */
30
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
31

    
32
	private Text keyOut;
33

    
34
	private Text valueOut;
35

    
36
	private MultipleOutputs multipleOutputs;
37

    
38
	private Gson g;
39

    
40
	@Override
41
	protected void setup(final Context context) throws IOException, InterruptedException {
42
		super.setup(context);
43

    
44
		keyOut = new Text();
45
		valueOut = new Text();
46
		multipleOutputs = new MultipleOutputs(context);
47
		g = new Gson();
48
	}
49

    
50
	@Override
51
	protected void cleanup(Context context) throws IOException, InterruptedException {
52
		multipleOutputs.close();
53
	}
54

    
55
	@Override
56
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
57
		try {
58
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
59

    
60
			final OafProtos.Oaf oaf = UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(rkd.getType().toString())));
61

    
62
			if (oaf == null) {
63
				return;
64
			} else {
65
				emit(context, oaf);
66
			}
67

    
68
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
69

    
70
			for (byte[] cf : row.keySet()) {
71

    
72
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
73
					final String key = new String(q.getKey());
74
					if(key.startsWith("update") || DedupUtils.BODY_S.equals(key)) {
75
						continue;
76
					}
77
					if (new String(q.getValue()).equals("")) {
78
						context.getCounter("export", "skipped relation " + new String(cf)).increment(1);
79
					} else {
80
						emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
81
					}
82
				}
83
			}
84
		} catch (final Throwable e) {
85
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
86
			throw new RuntimeException(e);
87
		}
88
	}
89

    
90
	private void emit(Context context, OafProtos.Oaf oaf) throws IOException, InterruptedException {
91
		Oaf result = null;
92
		try {
93
			result = ProtoConverter.convert(oaf);
94
		} catch (Throwable e) {
95
			context.getCounter("export", "error:" + e.getClass().getName()).increment(1);
96
		}
97
		if (result != null) {
98
			keyOut.set(result.getClass().getName());
99
			valueOut.set(g.toJson(result));
100

    
101
			final String namedOutput = result.getClass().getSimpleName().toLowerCase();
102
			multipleOutputs.write(namedOutput, keyOut, valueOut, namedOutput + "/" + namedOutput);
103

    
104
			context.getCounter("export", result.getClass().getName()).increment(1);
105
		}
106
	}
107

    
108
}
(4-4/10)