Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.actions2;
2

    
3
import java.io.IOException;
4

    
5
import eu.dnetlib.actionmanager.actions.AtomicAction;
6
import eu.dnetlib.data.mapreduce.JobParams;
7
import org.apache.hadoop.hbase.client.Put;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Mapper;
12

    
13
/**
14
 * Created by claudio on 11/04/16.
15
 */
16
public class PromoteActionSetFromHDFSMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
17

    
18
	private ImmutableBytesWritable keyOut;
19

    
20
	@Override
21
	protected void setup(final Context context) throws IOException, InterruptedException {
22
		super.setup(context);
23
		keyOut = new ImmutableBytesWritable();
24
	}
25

    
26
	@Override
27
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
28

    
29
		final AtomicAction aa = AtomicAction.fromJSON(value.toString());
30

    
31
		final byte[] keyOutBytes = Bytes.toBytes(aa.getTargetRowKey());
32
		keyOut.set(keyOutBytes);
33
		final Put put = new Put(keyOutBytes);
34
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
35
		put.add(Bytes.toBytes(aa.getTargetColumnFamily()), Bytes.toBytes(aa.getTargetColumn()), aa.getTargetValue());
36

    
37
		context.write(keyOut, put);
38
	}
39

    
40
}
(2-2/3)