Project

General

Profile

1 39222 claudio.at
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2
3 57502 sandro.lab
import com.google.gson.Gson;
4 57505 claudio.at
import com.google.protobuf.InvalidProtocolBufferException;
5 57508 claudio.at
import eu.dnetlib.data.mapreduce.util.DedupUtils;
6 57505 claudio.at
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
8 57502 sandro.lab
import eu.dnetlib.data.proto.OafProtos;
9 57505 claudio.at
import eu.dnetlib.data.proto.TypeProtos;
10 57502 sandro.lab
import eu.dnetlib.dhp.schema.oaf.Oaf;
11 57513 claudio.at
import eu.dnetlib.dhp.schema.oaf.Relation;
12 39222 claudio.at
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15 57505 claudio.at
import org.apache.hadoop.hbase.util.Bytes;
16 39222 claudio.at
import org.apache.hadoop.io.Text;
17 57513 claudio.at
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
18 39222 claudio.at
19 57502 sandro.lab
import java.io.IOException;
20
import java.util.Map;
21
import java.util.Map.Entry;
22
import java.util.NavigableMap;
23 39222 claudio.at
24 39275 claudio.at
/**
25
 * Exports Oaf objects as their json serialization.
26
 *
27
 * @author claudio
28
 *
29
 */
30 57502 sandro.lab
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
31 39222 claudio.at
32
	private Text keyOut;
33
34
	private Text valueOut;
35
36 57513 claudio.at
	private MultipleOutputs multipleOutputs;
37
38 57502 sandro.lab
	private Gson g;
39
40 39222 claudio.at
	@Override
41
	protected void setup(final Context context) throws IOException, InterruptedException {
42
		super.setup(context);
43
44
		keyOut = new Text();
45
		valueOut = new Text();
46 57513 claudio.at
		multipleOutputs = new MultipleOutputs(context);
47 57502 sandro.lab
		g = new Gson();
48 39222 claudio.at
	}
49
50
	@Override
51 57513 claudio.at
	protected void cleanup(Context context) throws IOException, InterruptedException {
52
		multipleOutputs.close();
53
	}
54
55
	@Override
56 39222 claudio.at
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
57
		try {
58 57505 claudio.at
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
59 39222 claudio.at
60 57513 claudio.at
			final OafProtos.Oaf oaf = UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(rkd.getType().toString())));
61 57505 claudio.at
62
			if (oaf == null) {
63
				return;
64 57508 claudio.at
			} else {
65
				emit(context, oaf);
66 57505 claudio.at
			}
67
68
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
69
70 39222 claudio.at
			for (byte[] cf : row.keySet()) {
71
72
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
73 57513 claudio.at
					final String key = new String(q.getKey());
74
					if("update".startsWith(key) || DedupUtils.BODY_S.equals(key)) {
75 57505 claudio.at
						continue;
76
					}
77 57508 claudio.at
					if (new String(q.getValue()).equals("")) {
78
						context.getCounter("export", "skipped relation " + new String(cf)).increment(1);
79
					} else {
80
						emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
81 57502 sandro.lab
					}
82 39222 claudio.at
				}
83
			}
84
		} catch (final Throwable e) {
85 57505 claudio.at
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
86 39222 claudio.at
			throw new RuntimeException(e);
87
		}
88
	}
89 57505 claudio.at
90 57508 claudio.at
	private void emit(Context context, OafProtos.Oaf oaf) throws IOException, InterruptedException {
91
		Oaf result = null;
92
		try {
93
			result = ProtoConverter.convert(oaf);
94
		} catch (Throwable e) {
95
			context.getCounter("export", "error:" + e.getClass().getName()).increment(1);
96
		}
97
		if (result != null) {
98
			keyOut.set(result.getClass().getName());
99
			valueOut.set(g.toJson(result));
100
101 57513 claudio.at
			final String namedOutput = result.getClass().getSimpleName().toLowerCase();
102
			multipleOutputs.write(namedOutput, keyOut, valueOut, namedOutput + "/" + namedOutput);
103
104 57508 claudio.at
			context.getCounter("export", result.getClass().getName()).increment(1);
105
		}
106 39222 claudio.at
	}
107 57505 claudio.at
108 39222 claudio.at
}