Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import eu.dnetlib.data.bulktag.CommunityConfiguration;
4
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
5
import eu.dnetlib.data.proto.FieldTypeProtos;
6
import eu.dnetlib.data.proto.OafProtos.Oaf;
7
import eu.dnetlib.data.proto.ResultProtos;
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.hadoop.hbase.client.Put;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.io.Writable;
15

    
16
import java.io.IOException;
17
import java.util.List;
18
import java.util.Map;
19

    
20
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
21

    
22
	private CommunityConfiguration cc;
23

    
24
	private ResultTagger tagger;
25

    
26
	@Override
27
	protected void setup(final Context context) throws IOException, InterruptedException {
28
		super.setup(context);
29

    
30
		final String conf = context.getConfiguration().get("tagging.conf");
31
		if (StringUtils.isBlank(conf)) {
32
			throw new IllegalArgumentException("missing bulk tagging configuration");
33
		}
34
		cc = CommunityConfigurationFactory.fromJson(conf);
35
		tagger = new ResultTagger();
36
	}
37

    
38
	@Override
39
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
40

    
41
		final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
42

    
43
		final byte[] body = resultMap.get(Bytes.toBytes("body"));
44

    
45
		if (body != null) {
46
			context.getCounter("In mapper", "not null body ").increment(1);
47

    
48
			final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
49
			if (oaf == null) {
50
				context.getCounter("In mapper", " null oaf ").increment(1);
51
				return;
52
			}
53

    
54
			long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream()
55
					.flatMap(c -> c.getDataInfoList().stream())
56
					.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
57
					.filter(infProv -> "bulktagging::community".equals(infProv))
58
					.count();
59
			context.getCounter("In mapper", " bulktagged ").increment(tagged);
60

    
61

    
62

    
63
			final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray());
64

    
65
			context.write(key, put);
66
			context.getCounter("In mapper", " write op ").increment(1);
67
		}
68
		else{
69
			context.getCounter("In mapper", " null body ").increment(1);
70
		}
71

    
72
	}
73

    
74

    
75
}
(1-1/3)