Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4

    
5
import eu.dnetlib.data.mapreduce.JobParams;
6
import org.apache.commons.io.IOUtils;
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Put;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Mapper;
15

    
16
import com.googlecode.protobuf.format.JsonFormat;
17

    
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19

    
20
public class ImportInformationSpaceDumpMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
21

    
22
	/**
23
	 * logger.
24
	 */
25
	private static final Log log = LogFactory.getLog(ImportInformationSpaceDumpMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
26

    
27
	private static final String SEPARATOR = "@";
28

    
29
	private ImmutableBytesWritable ibw;
30

    
31
	@Override
32
	protected void setup(final Context context) throws IOException, InterruptedException {
33
		super.setup(context);
34
		ibw = new ImmutableBytesWritable();
35
	}
36

    
37
	@Override
38
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
39
		try {
40

    
41
			String[] split = key.toString().split(SEPARATOR);
42

    
43
			if (split.length != 3)
44
				throw new IllegalArgumentException("malformed key: " + key.toString());
45

    
46
			byte[] rowKey = Bytes.toBytes(split[0]);
47
			byte[] columnFamily = Bytes.toBytes(split[1]);
48
			byte[] qualifier = Bytes.toBytes(split[2]);
49

    
50
			final Put put = new Put(rowKey);
51
			put.setWriteToWAL(JobParams.WRITE_TO_WAL);
52
			ibw.set(rowKey);
53

    
54
			if (StringUtils.isNotBlank(value.toString())) {
55

    
56
				Oaf.Builder oaf = Oaf.newBuilder();
57
				JsonFormat.merge(value.toString(), oaf);
58

    
59
				put.add(columnFamily, qualifier, oaf.build().toByteArray());
60
			} else {
61
				put.add(columnFamily, qualifier, Bytes.toBytes(""));
62
			}
63

    
64
			context.write(ibw, put);
65

    
66
		} catch (final Throwable e) {
67
			log.error(String.format("error importing the following record on HBase: \nkey: '%s'\nvalue: %s", key.toString(), value.toString()), e);
68
			context.getCounter("error", e.getClass().getName()).increment(1);
69
			throw new RuntimeException(e);
70
		}
71
	}
72

    
73
}
(11-11/18)