Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import eu.dnetlib.data.bulktag.CommunityConfiguration;
4
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
5
import eu.dnetlib.data.proto.OafProtos.Oaf;
6
import org.apache.commons.lang.StringUtils;
7
import org.apache.hadoop.hbase.client.Put;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Writable;
13

    
14
import java.io.IOException;
15
import java.util.Map;
16

    
17
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
18

    
19
	private CommunityConfiguration cc;
20

    
21
	private ResultTagger tagger;
22

    
23
	@Override
24
	protected void setup(final Context context) throws IOException, InterruptedException {
25
		super.setup(context);
26

    
27
		final String conf = context.getConfiguration().get("tagging.conf");
28
		if (StringUtils.isBlank(conf)) {
29
			throw new IllegalArgumentException("missing bulk tagging configuration");
30
		}
31
		cc = CommunityConfigurationFactory.fromJson(conf);
32
		tagger = new ResultTagger();
33
	}
34

    
35
	@Override
36
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
37

    
38
		final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
39

    
40
		final byte[] body = resultMap.get(Bytes.toBytes("body"));
41

    
42
		if (body != null) {
43

    
44
			final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
45
			if (oaf == null)
46
				return;
47
			final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray());
48

    
49
			context.write(key, put);
50
		}
51

    
52
	}
53

    
54

    
55
}
(1-1/3)