Project

General

Profile

1 42247 claudio.at
package eu.dnetlib.data.mapreduce.hbase.actions2;
2
3
import java.io.IOException;
4
5
import eu.dnetlib.actionmanager.actions.AtomicAction;
6 42590 claudio.at
import eu.dnetlib.data.mapreduce.JobParams;
7 42247 claudio.at
import org.apache.hadoop.hbase.client.Put;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Mapper;
12
13
/**
14
 * Created by claudio on 11/04/16.
15
 */
16
public class PromoteActionSetFromHDFSMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
17
18
	private ImmutableBytesWritable keyOut;
19
20
	@Override
21
	protected void setup(final Context context) throws IOException, InterruptedException {
22
		super.setup(context);
23
		keyOut = new ImmutableBytesWritable();
24
	}
25
26
	@Override
27
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
28
29
		final AtomicAction aa = AtomicAction.fromJSON(value.toString());
30
31
		final byte[] keyOutBytes = Bytes.toBytes(aa.getTargetRowKey());
32
		keyOut.set(keyOutBytes);
33
		final Put put = new Put(keyOutBytes);
34 42590 claudio.at
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
35 42247 claudio.at
		put.add(Bytes.toBytes(aa.getTargetColumnFamily()), Bytes.toBytes(aa.getTargetColumn()), aa.getTargetValue());
36
37
		context.write(keyOut, put);
38
	}
39
40
}