Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.bulktag;
2

    
3
import com.google.gson.Gson;
4
import com.jayway.jsonpath.DocumentContext;
5
import com.jayway.jsonpath.JsonPath;
6
import eu.dnetlib.data.bulktag.CommunityConfiguration;
7
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
8
import eu.dnetlib.data.proto.FieldTypeProtos;
9
import eu.dnetlib.data.proto.OafProtos;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import org.apache.commons.lang.StringUtils;
12
import org.apache.hadoop.hbase.client.Put;
13
import org.apache.hadoop.hbase.client.Result;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableMapper;
16
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.io.Writable;
18

    
19
import java.io.IOException;
20
import java.util.ArrayList;
21
import java.util.HashMap;
22
import java.util.List;
23
import java.util.Map;
24

    
25
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
26

    
27
	private CommunityConfiguration cc;
28

    
29
	private ResultTagger tagger;
30
	private boolean enabled;
31

    
32
//	private Map<String,String> protoMappingParams;
33
//
34
//	private ProtoMappingParamEncapsulator param;
35
	private ProtoMap protoMappingParams;
36

    
37
	@Override
38
	protected void setup(final Context context) throws IOException, InterruptedException {
39
		super.setup(context);
40

    
41
		final String conf = context.getConfiguration().get("tagging.conf");
42
		enabled = context.getConfiguration().getBoolean("tagging.enabled",false);
43
		if (StringUtils.isBlank(conf)) {
44
			throw new IllegalArgumentException("missing bulk tagging configuration");
45
		}
46
		System.out.println("conf = " + conf);
47
		cc = CommunityConfigurationFactory.fromJson(conf);
48

    
49
		tagger = new ResultTagger();
50
		tagger.setTrust(context.getConfiguration().get("bulktagging.trust", "0.85"));
51

    
52

    
53
		String mappingProto = context.getConfiguration().get("mapping.proto", "{}");
54

    
55
		System.out.println(String.format("got mapping.proto: '%s'", mappingProto));
56

    
57
		protoMappingParams = new Gson().fromJson(mappingProto,ProtoMap.class);
58
		/*param = new Gson().fromJson(mappingProto, ProtoMappingParamEncapsulator.class);
59
		System.out.println("Sto usando la nuova implementazione");
60
		protoMappingParams = param.getP();*/
61
	}
62

    
63
	@Override
64
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
65

    
66
		final Map<byte[], byte[]> resultMap = value.getFamilyMap(Bytes.toBytes("result"));
67

    
68
		final byte[] body = resultMap.get(Bytes.toBytes("body"));
69

    
70
		if (body != null) {
71
			context.getCounter("Bulk Tagging", "not null body ").increment(1);
72

    
73
			//final Oaf tagged = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
74
			final Oaf oaf = Oaf.parseFrom(body);
75

    
76
			final Oaf tagged = tagger.enrichContextCriteria(oaf,cc,context, protoMappingParams);// (Oaf.parseFrom(body), cc, context,protoMappingParams);
77
			if (tagged == null) {
78
				return;
79
			}
80

    
81
			long nTagged = tagged.getEntity().getResult().getMetadata().getContextList().stream()
82
					.flatMap(c -> c.getDataInfoList().stream())
83
					.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
84
					.filter(infProv -> "bulktagging".equals(infProv))
85
					.count();
86
			context.getCounter("Bulk Tagging", " bulktagged ").increment(nTagged);
87

    
88
			final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), tagged.toByteArray());
89

    
90

    
91
			if (enabled) {
92
				context.write(key, put);
93
				context.getCounter("Bulk Tagging", " write op ").increment(1);
94
			}
95

    
96
		}
97
		else{
98
			context.getCounter("Bulk Tagging", " null body ").increment(1);
99
		}
100

    
101
	}
102

    
103
	private Map<String,List<String>> getParamMap(byte[] body){
104
		Map<String,List<String>> param = new HashMap<>();
105
		DocumentContext jsonContext = JsonPath.parse(body);
106

    
107
		for(String key: protoMappingParams.keySet()) {
108
			try {
109
				param.put(key, jsonContext.read(protoMappingParams.get(key)));
110
			} catch (Exception e) {
111
				param.put(key, new ArrayList<>());
112
			}
113
		}
114
		return param;
115

    
116
	}
117

    
118
}
(1-1/4)