2 |
2 |
|
3 |
3 |
import eu.dnetlib.data.bulktag.CommunityConfiguration;
|
4 |
4 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory;
|
|
5 |
import eu.dnetlib.data.proto.FieldTypeProtos;
|
5 |
6 |
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
|
7 |
import eu.dnetlib.data.proto.ResultProtos;
|
6 |
8 |
import org.apache.commons.lang.StringUtils;
|
7 |
9 |
import org.apache.hadoop.hbase.client.Put;
|
8 |
10 |
import org.apache.hadoop.hbase.client.Result;
|
... | ... | |
12 |
14 |
import org.apache.hadoop.io.Writable;
|
13 |
15 |
|
14 |
16 |
import java.io.IOException;
|
|
17 |
import java.util.List;
|
15 |
18 |
import java.util.Map;
|
16 |
19 |
|
17 |
20 |
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> {
|
... | ... | |
41 |
44 |
|
42 |
45 |
if (body != null) {
|
43 |
46 |
context.getCounter("In mapper", "not null body ").increment(1);
|
|
47 |
|
44 |
48 |
final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context);
|
45 |
49 |
if (oaf == null) {
|
46 |
50 |
context.getCounter("In mapper", " null oaf ").increment(1);
|
47 |
51 |
return;
|
48 |
52 |
}
|
|
53 |
|
|
54 |
long tagged = oaf.getEntity().getResult().getMetadata().getContextList().stream()
|
|
55 |
.flatMap(c -> c.getDataInfoList().stream())
|
|
56 |
.map(FieldTypeProtos.DataInfo::getInferenceprovenance)
|
|
57 |
.filter(infProv -> "bulktagging::community".equals(infProv))
|
|
58 |
.count();
|
|
59 |
context.getCounter("In mapper", " bulktagged ").increment(tagged);
|
|
60 |
|
|
61 |
//
|
|
62 |
// List<ResultProtos.Result.Context> tmp = oaf.getEntity().getResult().getMetadata().getContextList();
|
|
63 |
// for(ResultProtos.Result.Context c:tmp){
|
|
64 |
// for(FieldTypeProtos.DataInfo d:c.getDataInfoList()){
|
|
65 |
// if (d.getInferenceprovenance().equals("bulktagging::community")){
|
|
66 |
// context.getCounter("In mapper", " bulktagged ").increment(1);
|
|
67 |
// }
|
|
68 |
// }
|
|
69 |
// }
|
|
70 |
|
49 |
71 |
final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray());
|
50 |
72 |
|
51 |
73 |
context.write(key, put);
|
52 |
74 |
context.getCounter("In mapper", " write op ").increment(1);
|
53 |
75 |
}
|
|
76 |
else{
|
|
77 |
context.getCounter("In mapper", " null body ").increment(1);
|
|
78 |
}
|
54 |
79 |
|
55 |
80 |
}
|
56 |
81 |
|
Adding counters for testing