Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.actions2;
2

    
3
import java.io.IOException;
4

    
5
import eu.dnetlib.actionmanager.actions.AtomicAction;
6
import eu.dnetlib.data.mapreduce.JobParams;
7
import org.apache.hadoop.hbase.client.Durability;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Mapper;
13

    
14
/**
15
 * Created by claudio on 11/04/16.
16
 */
17
public class PromoteActionSetFromHDFSMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
18

    
19
	private ImmutableBytesWritable keyOut;
20

    
21
	@Override
22
	protected void setup(final Context context) throws IOException, InterruptedException {
23
		super.setup(context);
24
		keyOut = new ImmutableBytesWritable();
25
	}
26

    
27
	@Override
28
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
29

    
30
		final AtomicAction aa = AtomicAction.fromJSON(value.toString());
31

    
32
		final byte[] keyOutBytes = Bytes.toBytes(aa.getTargetRowKey());
33
		keyOut.set(keyOutBytes);
34
		final Put put = new Put(keyOutBytes);
35
		put.setDurability(Durability.USE_DEFAULT);
36
		put.add(Bytes.toBytes(aa.getTargetColumnFamily()), Bytes.toBytes(aa.getTargetColumn()), aa.getTargetValue());
37

    
38
		context.write(keyOut, put);
39
	}
40

    
41
}
(2-2/3)