Revision 52883
Added by Claudio Atzori over 5 years ago
DedupReducer.java | ||
---|---|---|
10 | 10 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
11 | 11 |
import eu.dnetlib.pace.clustering.NGramUtils; |
12 | 12 |
import eu.dnetlib.pace.config.DedupConfig; |
13 |
import eu.dnetlib.pace.config.WfConfig; |
|
13 | 14 |
import eu.dnetlib.pace.distance.PaceDocumentDistance; |
14 | 15 |
import eu.dnetlib.pace.distance.eval.ScoreResult; |
15 | 16 |
import eu.dnetlib.pace.model.Field; |
... | ... | |
19 | 20 |
import org.apache.commons.lang.StringUtils; |
20 | 21 |
import org.apache.commons.logging.Log; |
21 | 22 |
import org.apache.commons.logging.LogFactory; |
23 |
import org.apache.hadoop.hbase.client.Durability; |
|
22 | 24 |
import org.apache.hadoop.hbase.client.Put; |
23 | 25 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
24 | 26 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
... | ... | |
50 | 52 |
if (q.size() > 1) { |
51 | 53 |
log.info("reducing key: '" + key + "' records: " + q.size()); |
52 | 54 |
|
53 |
switch (Type.valueOf(dedupConf.getWf().getEntityType())) { |
|
55 |
final Type type = Type.valueOf(dedupConf.getWf().getEntityType()); |
|
56 |
|
|
57 |
switch (type) { |
|
54 | 58 |
case result: |
55 | 59 |
process(simplifyQueue(q, key.toString(), context), context); |
56 | 60 |
break; |
... | ... | |
131 | 135 |
final Context context, |
132 | 136 |
final String fieldRef, |
133 | 137 |
final String ngram) { |
134 |
if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) { |
|
138 |
final WfConfig wfConf = dedupConf.getWf(); |
|
139 |
if (tempResults.size() < wfConf.getGroupMaxSize()) { |
|
135 | 140 |
q.addAll(tempResults); |
136 | 141 |
} else { |
137 |
context.getCounter(dedupConf.getWf().getEntityType(),
|
|
138 |
"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
|
|
142 |
context.getCounter(wfConf.getEntityType(),
|
|
143 |
"Skipped records for count(" + wfConf.getOrderField() + ") >= " + wfConf.getGroupMaxSize())
|
|
139 | 144 |
.increment(tempResults.size()); |
140 | 145 |
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram); |
141 | 146 |
} |
... | ... | |
229 | 234 |
|
230 | 235 |
//final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, oaf.build().toByteArray()); |
231 | 236 |
final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes("")); |
232 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
|
237 |
put.setDurability(Durability.SKIP_WAL);
|
|
233 | 238 |
ibw.set(from); |
234 | 239 |
context.write(ibw, put); |
235 | 240 |
} |
Also available in: Unified diff
deprecation: use setDurability instead of setWriteToWAL