Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6
import java.util.NavigableMap;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.io.Text;
14

    
15
import com.google.common.base.Joiner;
16
import com.googlecode.protobuf.format.JsonFormat;
17

    
18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19

    
20
public class ExportInformationSpaceMapper extends TableMapper<Text, Text> {
21

    
22
	/**
23
	 * logger.
24
	 */
25
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
26

    
27
	private static final String SEPARATOR = "@";
28

    
29
	private Text keyOut;
30

    
31
	private Text valueOut;
32

    
33
	@Override
34
	protected void setup(final Context context) throws IOException, InterruptedException {
35
		super.setup(context);
36

    
37
		keyOut = new Text();
38
		valueOut = new Text();
39
	}
40

    
41
	@Override
42
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
43
		try {
44
			byte[] rowKey = keyIn.copyBytes();
45
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
46

    
47
			for (byte[] cf : row.keySet()) {
48

    
49
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
50

    
51
					emit(rowKey, cf, q.getKey(), q.getValue(), context);
52
				}
53
			}
54
		} catch (final Throwable e) {
55
			log.error("error exporting the following record from HBase: " + value.toString(), e);
56
			context.getCounter("error", e.getClass().getName()).increment(1);
57
			throw new RuntimeException(e);
58
		}
59
	}
60

    
61
	private void emit(final byte[] rowKey, final byte[] cf, final byte[] q, final byte[] value, final Context context) throws IOException, InterruptedException {
62

    
63
		keyOut.set(Joiner.on(SEPARATOR).join(new String(rowKey), new String(cf), new String(q)));
64

    
65
		if ((value == null) || (value.length == 0)) {
66
			valueOut.set("");
67
		} else {
68
			valueOut.set(JsonFormat.printToString(OafDecoder.decode(value).getOaf()));
69
		}
70
		context.write(keyOut, valueOut);
71
	}
72

    
73
}
(1-1/2)