Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.actions2;
2

    
3
import java.io.IOException;
4
import java.util.Set;
5

    
6
import org.apache.hadoop.hbase.client.Put;
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.hbase.util.Bytes;
11

    
12
import com.google.common.base.Splitter;
13
import com.google.common.collect.Sets;
14

    
15
public class PromoteActionsMapper extends TableMapper<ImmutableBytesWritable, Put> {
16

    
17
	private static final String LATEST_RAW_SETS = "latestRawSets";
18

    
19
	private Set<byte[]> latestRawSets = Sets.newHashSet();
20

    
21
	@Override
22
	protected void setup(final Context context) throws IOException, InterruptedException {
23

    
24
		final String s = context.getConfiguration().get(LATEST_RAW_SETS);
25
		if (s != null) {
26
			for (String set : Sets.newHashSet(Splitter.on(",").omitEmptyStrings().trimResults().split(s))) {
27
				latestRawSets.add(Bytes.toBytes(set));
28
			}
29
		}
30

    
31
		if (latestRawSets.isEmpty()) { throw new IOException("Input parameter (" + LATEST_RAW_SETS + ") is missing or empty: " + s); }
32
	}
33

    
34
	@Override
35
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
36
		for (byte[] s1 : value.getFamilyMap(Bytes.toBytes("set")).keySet()) {
37
			for (byte[] s2 : latestRawSets) {
38
				if (Bytes.equals(s1, s2)) {
39
					promoteAction(value, context);
40
				}
41
			}
42
		}
43
	}
44

    
45
	private void promoteAction(final Result value, final Context context) throws IOException, InterruptedException {
46
		final byte[] cf = Bytes.toBytes("target");
47
		final byte[] tkey = value.getValue(cf, Bytes.toBytes("rowKey"));
48
		final byte[] tcf = value.getValue(cf, Bytes.toBytes("columnFamily"));
49
		final byte[] tc = value.getValue(cf, Bytes.toBytes("column"));
50
		final byte[] tv = value.getValue(cf, Bytes.toBytes("content"));
51

    
52
		final Put put = new Put(tkey);
53
		put.add(tcf, tc, tv);
54

    
55
		context.getCounter("Actions", Bytes.toString(tcf)).increment(1);
56

    
57
		context.write(new ImmutableBytesWritable(tkey), put);
58
	}
59
}
(3-3/3)