Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import eu.dnetlib.data.bulktag.CommunityConfiguration;
4
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
5
import eu.dnetlib.data.proto.FieldTypeProtos;
6
import eu.dnetlib.data.proto.OafProtos.Oaf;
7
import eu.dnetlib.data.proto.ResultProtos;
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.hadoop.hbase.client.Put;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.io.Writable;
15

    
16
import java.io.IOException;
17
import java.util.List;
18
import java.util.Map;
19

    
20
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
21

    
22
	private CommunityConfiguration cc;
23

    
24
	private ResultTagger tagger;
25
	private boolean enabled;
26

    
27
	@Override
28
	protected void setup(final Context context) throws IOException, InterruptedException {
29
		super.setup(context);
30

    
31
		final String conf = context.getConfiguration().get("tagging.conf");
32
		enabled = context.getConfiguration().getBoolean("enabled",false);
33
		if (StringUtils.isBlank(conf)) {
34
			throw new IllegalArgumentException("missing bulk tagging configuration");
35
		}
36
		System.out.println("conf = " + conf);
37
		cc = CommunityConfigurationFactory.fromJson(conf);
38
		tagger = new ResultTagger();
39
	}
40

    
41
	@Override
42
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
43

    
44
		final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
45

    
46
		final byte[] body = resultMap.get(Bytes.toBytes("body"));
47

    
48
		if (body != null) {
49
			context.getCounter("Bulk Tagging", "not null body ").increment(1);
50

    
51
			final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
52
			if (oaf == null) {
53
				//context.getCounter("In mapper", " null oaf ").increment(1);
54
				return;
55
			}
56

    
57
			long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream()
58
					.flatMap(c -> c.getDataInfoList().stream())
59
					.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
60
					.filter(infProv -> "bulktagging::community".equals(infProv))
61
					.count();
62
			context.getCounter("Bulk Tagging", " bulktagged ").increment(tagged);
63

    
64

    
65
			final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray());
66

    
67
			if(tagged > 0){
68
				if (enabled)
69
					context.write(key, put);
70
				context.getCounter("Bulk Tagging", " write op ").increment(1);
71
			}
72

    
73
		}
74
		else{
75
			context.getCounter("Bulk Tagging", " null body ").increment(1);
76
		}
77

    
78
	}
79

    
80

    
81
}
(1-1/3)