Revision 52878
Added by Claudio Atzori over 5 years ago
DedupMapper.java | ||
---|---|---|
13 | 13 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
14 | 14 |
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner; |
15 | 15 |
import eu.dnetlib.pace.config.DedupConfig; |
16 |
import eu.dnetlib.pace.config.WfConfig; |
|
17 |
import eu.dnetlib.pace.model.Field; |
|
16 | 18 |
import eu.dnetlib.pace.model.MapDocument; |
17 | 19 |
import eu.dnetlib.pace.model.ProtoDocumentBuilder; |
18 | 20 |
import org.apache.commons.logging.Log; |
... | ... | |
22 | 24 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
23 | 25 |
import org.apache.hadoop.hbase.util.Bytes; |
24 | 26 |
import org.apache.hadoop.io.Text; |
27 |
import org.apache.hadoop.mapred.JobTracker.IllegalStateException; |
|
25 | 28 |
|
26 | 29 |
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> { |
27 | 30 |
|
... | ... | |
50 | 53 |
outKey = new Text(); |
51 | 54 |
ibw = new ImmutableBytesWritable(); |
52 | 55 |
|
53 |
log.info("pace conf"); |
|
54 |
log.info("entity type: " + dedupConf.getWf().getEntityType()); |
|
55 |
log.info("clustering: " + dedupConf.getPace().getClustering()); |
|
56 |
log.info("conditions: " + dedupConf.getPace().getConditions()); |
|
57 |
log.info("fields: " + dedupConf.getPace().getModel()); |
|
58 |
log.info("blacklists: " + blackListMap); |
|
56 |
//log.info("pace conf");
|
|
57 |
//log.info("entity type: " + dedupConf.getWf().getEntityType());
|
|
58 |
//log.info("clustering: " + dedupConf.getPace().getClustering());
|
|
59 |
//log.info("conditions: " + dedupConf.getPace().getConditions());
|
|
60 |
//log.info("fields: " + dedupConf.getPace().getModel());
|
|
61 |
//log.info("blacklists: " + blackListMap);
|
|
59 | 62 |
log.info("wf conf: " + dedupConf.toString()); |
60 | 63 |
} |
61 | 64 |
|
... | ... | |
63 | 66 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException { |
64 | 67 |
// log.info("got key: " + new String(keyIn.copyBytes())); |
65 | 68 |
|
66 |
final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B); |
|
69 |
final WfConfig wf = dedupConf.getWf(); |
|
70 |
final byte[] body = result.getValue(wf.getEntityType().getBytes(), DedupUtils.BODY_B); |
|
67 | 71 |
|
68 | 72 |
if (body != null) { |
69 | 73 |
|
70 | 74 |
final OafDecoder decoder = OafDecoder.decode(body); |
71 | 75 |
if (decoder.getOaf().getDataInfo().getDeletedbyinference()) { |
72 |
context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1);
|
|
76 |
context.getCounter(wf.getEntityType(), "deleted by inference").increment(1);
|
|
73 | 77 |
return; |
74 | 78 |
} |
75 | 79 |
|
... | ... | |
77 | 81 |
|
78 | 82 |
context.getCounter(entity.getType().toString(), "decoded").increment(1); |
79 | 83 |
|
80 |
if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) {
|
|
84 |
if (entity.getType().equals(Type.valueOf(wf.getEntityType()))) {
|
|
81 | 85 |
|
82 |
// TODO: remove this hack - here because we want to dedup only publications and organizazions |
|
83 |
if (shouldDedup(entity)) { |
|
84 |
final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel()); |
|
86 |
final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel()); |
|
87 |
context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1); |
|
88 |
|
|
89 |
if (wf.hasSubType()) { |
|
90 |
|
|
91 |
final Map<String, Field> fields = doc.getFieldMap(); |
|
92 |
|
|
93 |
if (!fields.containsKey(wf.getSubEntityType())) { |
|
94 |
throw new IllegalStateException(String.format("model map does not contain field %s", wf.getSubEntityType())); |
|
95 |
} |
|
96 |
|
|
97 |
final String subType = fields.get(wf.getSubEntityType()).stringValue(); |
|
98 |
if (wf.getSubEntityValue().equalsIgnoreCase(subType)) { |
|
99 |
context.getCounter(subType, "converted as MapDocument").increment(1); |
|
100 |
emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap)); |
|
101 |
} else { |
|
102 |
context.getCounter(subType, "ignored").increment(1); |
|
103 |
} |
|
104 |
} else { |
|
85 | 105 |
emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap)); |
86 | 106 |
} |
87 | 107 |
} |
88 | 108 |
} else { |
89 |
context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
|
|
109 |
context.getCounter(wf.getEntityType(), "missing body").increment(1);
|
|
90 | 110 |
} |
91 | 111 |
} |
92 | 112 |
|
93 |
private boolean shouldDedup(final OafEntity entity) { |
|
94 |
return (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("publication")) | |
|
95 |
entity.getType().equals(Type.organization); |
|
96 |
} |
|
97 |
|
|
98 | 113 |
private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException { |
99 | 114 |
for (final String ngram : ngrams) { |
100 | 115 |
outKey.set(ngram); |
Also available in: Unified diff
introduced subType in pace wf configuration