Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import com.google.common.base.Splitter;
4
import com.google.common.collect.Lists;
5
import eu.dnetlib.data.bulktag.CommunityConfiguration;
6
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
7
import eu.dnetlib.data.proto.FieldTypeProtos;
8
import eu.dnetlib.data.proto.OafProtos.Oaf;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.hadoop.hbase.client.Put;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Writable;
17

    
18
import java.io.IOException;
19
import java.util.List;
20
import java.util.Map;
21

    
22
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
23

    
24
	private CommunityConfiguration cc;
25

    
26
	private ResultTagger tagger;
27
	private boolean enabled;
28

    
29
	@Override
30
	protected void setup(final Context context) throws IOException, InterruptedException {
31
		super.setup(context);
32

    
33
		final String conf = context.getConfiguration().get("tagging.conf");
34
		enabled = context.getConfiguration().getBoolean("tagging.enabled",false);
35
		if (StringUtils.isBlank(conf)) {
36
			throw new IllegalArgumentException("missing bulk tagging configuration");
37
		}
38
		System.out.println("conf = " + conf);
39
		cc = CommunityConfigurationFactory.fromJson(conf);
40
		tagger = new ResultTagger();
41
		tagger.setTrust(context.getConfiguration().get("bulktagging.trust", "0.85"));
42
	}
43

    
44
	@Override
45
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
46

    
47
		final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
48

    
49
		final byte[] body = resultMap.get(Bytes.toBytes("body"));
50

    
51
		if (body != null) {
52
			context.getCounter("Bulk Tagging", "not null body ").increment(1);
53

    
54
			final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
55
			if (oaf == null) {
56
				//context.getCounter("In mapper", " null oaf ").increment(1);
57
				return;
58
			}
59

    
60
			long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream()
61
					.flatMap(c -> c.getDataInfoList().stream())
62
					.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
63
					.filter(infProv -> "bulktagging".equals(infProv))
64
					.count();
65
			context.getCounter("Bulk Tagging", " bulktagged ").increment(tagged);
66

    
67

    
68
			final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray());
69

    
70
			if(tagged > 0){
71
				if (enabled)
72
					context.write(key, put);
73
				context.getCounter("Bulk Tagging", " write op ").increment(1);
74
			}
75

    
76
		}
77
		else{
78
			context.getCounter("Bulk Tagging", " null body ").increment(1);
79
		}
80

    
81
	}
82

    
83

    
84
}
(1-1/3)