Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6
import java.util.NavigableMap;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.io.Text;
14

    
15
import com.google.common.base.Joiner;
16
import com.googlecode.protobuf.format.JsonFormat;
17

    
18
import eu.dnetlib.data.mapreduce.util.OafDecoder;
19

    
20
/**
21
 * Exports Oaf objects as their json serialization.
22
 *
23
 * @author claudio
24
 *
25
 */
26
public class ExportInformationSpaceMapper extends TableMapper<Text, Text> {
27

    
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
32

    
33
	private static final String SEPARATOR = "@";
34

    
35
	private Text keyOut;
36

    
37
	private Text valueOut;
38

    
39
	@Override
40
	protected void setup(final Context context) throws IOException, InterruptedException {
41
		super.setup(context);
42

    
43
		keyOut = new Text();
44
		valueOut = new Text();
45
	}
46

    
47
	@Override
48
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49
		try {
50
			byte[] rowKey = keyIn.copyBytes();
51
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
52

    
53
			for (byte[] cf : row.keySet()) {
54

    
55
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
56

    
57
					emit(rowKey, cf, q.getKey(), q.getValue(), context);
58
				}
59
			}
60
		} catch (final Throwable e) {
61
			log.error("error exporting the following record from HBase: " + value.toString(), e);
62
			context.getCounter("error", e.getClass().getName()).increment(1);
63
			throw new RuntimeException(e);
64
		}
65
	}
66

    
67
	private void emit(final byte[] rowKey, final byte[] cf, final byte[] q, final byte[] value, final Context context) throws IOException, InterruptedException {
68

    
69
		keyOut.set(Joiner.on(SEPARATOR).join(new String(rowKey), new String(cf), new String(q)));
70

    
71
		if ((value == null) || (value.length == 0)) {
72
			valueOut.set("");
73
		} else {
74
			valueOut.set(new JsonFormat().printToString(OafDecoder.decode(value).getOaf()));
75
		}
76
		context.write(keyOut, valueOut);
77
	}
78

    
79
}
(3-3/10)