Project

General

Profile

1 39222 claudio.at
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6
import java.util.NavigableMap;
7
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.io.Text;
14
15
import com.google.common.base.Joiner;
16
import com.googlecode.protobuf.format.JsonFormat;
17
18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19
20 39275 claudio.at
/**
21
 * Exports Oaf objects as their json serialization.
22
 *
23
 * @author claudio
24
 *
25
 */
26 39222 claudio.at
public class ExportInformationSpaceMapper extends TableMapper<Text, Text> {
27
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
32
33
	private static final String SEPARATOR = "@";
34
35
	private Text keyOut;
36
37
	private Text valueOut;
38
39
	@Override
40
	protected void setup(final Context context) throws IOException, InterruptedException {
41
		super.setup(context);
42
43
		keyOut = new Text();
44
		valueOut = new Text();
45
	}
46
47
	@Override
48
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49
		try {
50
			byte[] rowKey = keyIn.copyBytes();
51
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
52
53
			for (byte[] cf : row.keySet()) {
54
55
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
56
57
					emit(rowKey, cf, q.getKey(), q.getValue(), context);
58
				}
59
			}
60
		} catch (final Throwable e) {
61
			log.error("error exporting the following record from HBase: " + value.toString(), e);
62
			context.getCounter("error", e.getClass().getName()).increment(1);
63
			throw new RuntimeException(e);
64
		}
65
	}
66
67
	private void emit(final byte[] rowKey, final byte[] cf, final byte[] q, final byte[] value, final Context context) throws IOException, InterruptedException {
68
69
		keyOut.set(Joiner.on(SEPARATOR).join(new String(rowKey), new String(cf), new String(q)));
70
71
		if ((value == null) || (value.length == 0)) {
72
			valueOut.set("");
73
		} else {
74 42590 claudio.at
			valueOut.set(new JsonFormat().printToString(OafDecoder.decode(value).getOaf()));
75 39222 claudio.at
		}
76
		context.write(keyOut, valueOut);
77
	}
78
79
}