Revision 52919
Added by Miriam Baglioni over 5 years ago
BulkTaggingMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.bulktag; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
import java.util.function.Function; |
|
6 |
import java.util.stream.Collectors; |
|
7 |
|
|
8 |
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo; |
|
9 |
import eu.dnetlib.data.proto.FieldTypeProtos.Qualifier; |
|
3 |
import eu.dnetlib.data.bulktag.CommunityConfiguration; |
|
4 |
import eu.dnetlib.data.bulktag.CommunityConfigurationFactory; |
|
10 | 5 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
11 |
import eu.dnetlib.data.proto.ResultProtos; |
|
12 | 6 |
import org.apache.commons.lang.StringUtils; |
13 | 7 |
import org.apache.hadoop.hbase.client.Put; |
14 | 8 |
import org.apache.hadoop.hbase.client.Result; |
... | ... | |
17 | 11 |
import org.apache.hadoop.hbase.util.Bytes; |
18 | 12 |
import org.apache.hadoop.io.Writable; |
19 | 13 |
|
14 |
import java.io.IOException; |
|
15 |
import java.util.Map; |
|
16 |
|
|
20 | 17 |
public class BulkTaggingMapper extends TableMapper<ImmutableBytesWritable, Writable> { |
21 | 18 |
|
22 |
private String taggingConf = null;
|
|
19 |
private CommunityConfiguration cc;
|
|
23 | 20 |
|
24 |
private final static String COUNTER_GROUP = "Bulk Tagging";
|
|
21 |
private ResultTagger tagger;
|
|
25 | 22 |
|
26 | 23 |
@Override |
27 | 24 |
protected void setup(final Context context) throws IOException, InterruptedException { |
... | ... | |
31 | 28 |
if (StringUtils.isBlank(conf)) { |
32 | 29 |
throw new IllegalArgumentException("missing bulk tagging configuration"); |
33 | 30 |
} |
34 |
|
|
35 |
taggingConf = conf;
|
|
31 |
cc = CommunityConfigurationFactory.fromJson(conf); |
|
32 |
tagger = new ResultTagger();
|
|
36 | 33 |
} |
37 | 34 |
|
38 | 35 |
@Override |
... | ... | |
44 | 41 |
|
45 | 42 |
if (body != null) { |
46 | 43 |
|
47 |
final Oaf oaf = enrichContext(Oaf.parseFrom(body), context); |
|
48 |
|
|
44 |
final Oaf oaf = tagger.enrichContext(Oaf.parseFrom(body), cc, context); |
|
45 |
if (oaf == null) |
|
46 |
return; |
|
49 | 47 |
final Put put = new Put(key.copyBytes()).add(Bytes.toBytes("result"), Bytes.toBytes("body"), oaf.toByteArray()); |
50 | 48 |
|
51 | 49 |
context.write(key, put); |
... | ... | |
53 | 51 |
|
54 | 52 |
} |
55 | 53 |
|
56 |
private Oaf enrichContext(final Oaf oaf, final Context context) { |
|
57 |
final Oaf.Builder builder = Oaf.newBuilder(oaf); |
|
58 | 54 |
|
59 |
final Map<String, ResultProtos.Result.Context> contexts = oaf.getEntity().getResult().getMetadata().getContextList() |
|
60 |
.stream() |
|
61 |
.collect(Collectors.toMap( |
|
62 |
c -> c.getId(), |
|
63 |
Function.identity(), |
|
64 |
(c1, c2) -> c1)); |
|
65 |
|
|
66 |
final String contextId = "xxx"; |
|
67 |
|
|
68 |
final ResultProtos.Result.Context c = contexts.get(contextId); |
|
69 |
if (c != null) { |
|
70 |
// add new dataInfo |
|
71 |
|
|
72 |
context.getCounter(COUNTER_GROUP, "add provenance").increment(1); |
|
73 |
} else { |
|
74 |
builder.getEntityBuilder().getResultBuilder().getMetadataBuilder().addContext(buildContext(contextId)); |
|
75 |
|
|
76 |
context.getCounter(COUNTER_GROUP, "add context").increment(1); |
|
77 |
|
|
78 |
} |
|
79 |
return builder.build(); |
|
80 |
} |
|
81 |
|
|
82 |
private ResultProtos.Result.Context buildContext(final String contextId) { |
|
83 |
return ResultProtos.Result.Context.newBuilder() |
|
84 |
.setId(contextId) |
|
85 |
.setDataInfo(DataInfo.newBuilder() |
|
86 |
.setInferred(true) |
|
87 |
.setProvenanceaction( |
|
88 |
Qualifier.newBuilder() |
|
89 |
.setClassid("bulktagging::community") |
|
90 |
.setClassname("Bulk Tagging for Communities") |
|
91 |
.setSchemeid("dnet:provenanceActions") |
|
92 |
.setSchemename("dnet:provenanceActions")) |
|
93 |
.setInferenceprovenance("bulktagging::community") |
|
94 |
.setTrust("0.85") |
|
95 |
.build()) |
|
96 |
.build(); |
|
97 |
} |
|
98 | 55 |
} |
Also available in: Unified diff
bulk tagging mapper and tagger