3 |
3 |
import java.io.IOException;
|
4 |
4 |
import java.util.*;
|
5 |
5 |
|
|
6 |
import com.google.common.base.Function;
|
|
7 |
import com.google.common.collect.Iterables;
|
6 |
8 |
import com.google.common.collect.Lists;
|
|
9 |
import com.google.protobuf.InvalidProtocolBufferException;
|
7 |
10 |
import eu.dnetlib.data.mapreduce.JobParams;
|
8 |
11 |
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
|
12 |
import eu.dnetlib.data.mapreduce.util.StreamUtils;
|
|
13 |
import eu.dnetlib.data.proto.OafProtos;
|
9 |
14 |
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
10 |
15 |
import eu.dnetlib.data.proto.TypeProtos.Type;
|
11 |
16 |
import eu.dnetlib.pace.clustering.NGramUtils;
|
... | ... | |
13 |
18 |
import eu.dnetlib.pace.config.WfConfig;
|
14 |
19 |
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
15 |
20 |
import eu.dnetlib.pace.distance.eval.ScoreResult;
|
16 |
|
import eu.dnetlib.pace.model.Field;
|
17 |
|
import eu.dnetlib.pace.model.MapDocument;
|
18 |
|
import eu.dnetlib.pace.model.MapDocumentComparator;
|
19 |
|
import eu.dnetlib.pace.model.MapDocumentSerializer;
|
|
21 |
import eu.dnetlib.pace.model.*;
|
|
22 |
import eu.dnetlib.pace.util.BlockProcessor;
|
|
23 |
import eu.dnetlib.pace.util.Reporter;
|
20 |
24 |
import org.apache.commons.lang.StringUtils;
|
21 |
25 |
import org.apache.commons.logging.Log;
|
22 |
26 |
import org.apache.commons.logging.LogFactory;
|
... | ... | |
27 |
31 |
import org.apache.hadoop.hbase.util.Bytes;
|
28 |
32 |
import org.apache.hadoop.io.Text;
|
29 |
33 |
|
|
34 |
import javax.annotation.Nullable;
|
|
35 |
|
30 |
36 |
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
31 |
37 |
|
32 |
38 |
private static final Log log = LogFactory.getLog(DedupReducer.class);
|
... | ... | |
45 |
51 |
}
|
46 |
52 |
|
47 |
53 |
@Override
|
48 |
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
|
54 |
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
|
49 |
55 |
|
50 |
|
final Queue<MapDocument> q = prepare(context, key, values);
|
51 |
|
|
52 |
|
if (q.size() > 1) {
|
53 |
|
log.info("reducing key: '" + key + "' records: " + q.size());
|
54 |
|
|
55 |
|
final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
|
56 |
|
|
57 |
|
switch (type) {
|
58 |
|
case result:
|
59 |
|
process(simplifyQueue(q, key.toString(), context), context);
|
60 |
|
break;
|
61 |
|
case organization:
|
62 |
|
process(q, context);
|
63 |
|
break;
|
64 |
|
default:
|
65 |
|
throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
|
|
56 |
final Iterable<MapDocument> docs = Iterables.transform(values, new Function<ImmutableBytesWritable, MapDocument>() {
|
|
57 |
@Nullable
|
|
58 |
@Override
|
|
59 |
public MapDocument apply(@Nullable ImmutableBytesWritable b) {
|
|
60 |
return MapDocumentSerializer.decode(b.copyBytes());
|
66 |
61 |
}
|
67 |
|
} else {
|
68 |
|
context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
|
69 |
|
}
|
70 |
|
}
|
|
62 |
});
|
71 |
63 |
|
72 |
|
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
73 |
|
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
|
|
64 |
new BlockProcessor(dedupConf).process(key.toString(), docs, new Reporter() {
|
74 |
65 |
|
75 |
|
final Set<String> seen = new HashSet<String>();
|
76 |
|
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
|
77 |
|
|
78 |
|
boolean logged = false;
|
79 |
|
|
80 |
|
for (final ImmutableBytesWritable i : values) {
|
81 |
|
if (queue.size() <= queueMaxSize) {
|
82 |
|
final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
83 |
|
final String id = doc.getIdentifier();
|
84 |
|
|
85 |
|
if (!seen.contains(id)) {
|
86 |
|
seen.add(id);
|
87 |
|
queue.add(doc);
|
88 |
|
}
|
89 |
|
|
90 |
|
} else {
|
91 |
|
if (!logged) {
|
92 |
|
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
|
93 |
|
context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
|
94 |
|
log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
|
95 |
|
logged = true;
|
96 |
|
}
|
|
66 |
@Override
|
|
67 |
public void incrementCounter(String counterGroup, String counterName, long delta) {
|
|
68 |
context.getCounter(counterGroup, counterName).increment(delta);
|
97 |
69 |
}
|
98 |
|
}
|
99 |
70 |
|
100 |
|
return queue;
|
101 |
|
}
|
|
71 |
@Override
|
|
72 |
public void emit(String type, String from, String to) {
|
102 |
73 |
|
103 |
|
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
104 |
|
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
105 |
|
|
106 |
|
String fieldRef = "";
|
107 |
|
final List<MapDocument> tempResults = Lists.newArrayList();
|
108 |
|
|
109 |
|
while (!queue.isEmpty()) {
|
110 |
|
final MapDocument result = queue.remove();
|
111 |
|
|
112 |
|
final String orderFieldName = dedupConf.getWf().getOrderField();
|
113 |
|
final Field orderFieldValue = result.values(orderFieldName);
|
114 |
|
if (!orderFieldValue.isEmpty()) {
|
115 |
|
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
|
116 |
|
if (field.equals(fieldRef)) {
|
117 |
|
tempResults.add(result);
|
118 |
|
} else {
|
119 |
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
120 |
|
tempResults.clear();
|
121 |
|
tempResults.add(result);
|
122 |
|
fieldRef = field;
|
123 |
|
}
|
124 |
|
} else {
|
125 |
|
context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
|
|
74 |
emitRel(context, type, from, to);
|
|
75 |
emitRel(context, type, to, from);
|
126 |
76 |
}
|
127 |
|
}
|
128 |
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
129 |
77 |
|
130 |
|
return q;
|
131 |
|
}
|
132 |
|
|
133 |
|
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
134 |
|
final List<MapDocument> tempResults,
|
135 |
|
final Context context,
|
136 |
|
final String fieldRef,
|
137 |
|
final String ngram) {
|
138 |
|
final WfConfig wfConf = dedupConf.getWf();
|
139 |
|
if (tempResults.size() < wfConf.getGroupMaxSize()) {
|
140 |
|
q.addAll(tempResults);
|
141 |
|
} else {
|
142 |
|
context.getCounter(wfConf.getEntityType(),
|
143 |
|
"Skipped records for count(" + wfConf.getOrderField() + ") >= " + wfConf.getGroupMaxSize())
|
144 |
|
.increment(tempResults.size());
|
145 |
|
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
146 |
|
}
|
147 |
|
}
|
148 |
|
|
149 |
|
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
|
150 |
|
|
151 |
|
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
152 |
|
|
153 |
|
while (!queue.isEmpty()) {
|
154 |
|
|
155 |
|
final MapDocument pivot = queue.remove();
|
156 |
|
final String idPivot = pivot.getIdentifier();
|
157 |
|
|
158 |
|
final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
|
159 |
|
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
160 |
|
|
161 |
|
if (fieldPivot != null) {
|
162 |
|
// System.out.println(idPivot + " --> " + fieldPivot);
|
163 |
|
|
164 |
|
int i = 0;
|
165 |
|
for (final MapDocument curr : queue) {
|
166 |
|
final String idCurr = curr.getIdentifier();
|
167 |
|
|
168 |
|
if (mustSkip(idCurr)) {
|
169 |
|
context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
|
170 |
|
break;
|
171 |
|
}
|
172 |
|
|
173 |
|
if (i > dedupConf.getWf().getSlidingWindowSize()) {
|
174 |
|
break;
|
175 |
|
}
|
176 |
|
|
177 |
|
final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
|
178 |
|
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
179 |
|
|
180 |
|
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
|
181 |
|
|
182 |
|
final ScoreResult sr = similarity(algo, pivot, curr);
|
183 |
|
emitOutput(sr, idPivot, idCurr, context);
|
184 |
|
i++;
|
185 |
|
}
|
|
78 |
private void emitRel(final Context context, final String type, final String from, final String to) {
|
|
79 |
final Put put = new Put(Bytes.toBytes(from)).add(DedupUtils.getSimilarityCFBytes(type), Bytes.toBytes(to), Bytes.toBytes(""));
|
|
80 |
put.setDurability(Durability.SKIP_WAL);
|
|
81 |
ibw.set(Bytes.toBytes(from));
|
|
82 |
try {
|
|
83 |
context.write(ibw, put);
|
|
84 |
} catch (IOException | InterruptedException e) {
|
|
85 |
e.printStackTrace();
|
186 |
86 |
}
|
187 |
87 |
}
|
188 |
|
}
|
|
88 |
});
|
189 |
89 |
}
|
190 |
90 |
|
191 |
|
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
|
192 |
|
final double d = sr.getScore();
|
193 |
|
|
194 |
|
if (d >= dedupConf.getWf().getThreshold()) {
|
195 |
|
writeSimilarity(context, idPivot, idCurr, d);
|
196 |
|
context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
|
197 |
|
} else {
|
198 |
|
context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
|
199 |
|
}
|
200 |
|
}
|
201 |
|
|
202 |
|
private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
|
203 |
|
try {
|
204 |
|
return algo.between(a, b, dedupConf);
|
205 |
|
} catch(Throwable e) {
|
206 |
|
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
|
207 |
|
throw new IllegalArgumentException(e);
|
208 |
|
}
|
209 |
|
}
|
210 |
|
|
211 |
|
private boolean mustSkip(final String idPivot) {
|
212 |
|
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
|
213 |
|
}
|
214 |
|
|
215 |
|
private String getNsPrefix(final String id) {
|
216 |
|
return StringUtils.substringBetween(id, "|", "::");
|
217 |
|
}
|
218 |
|
|
219 |
|
private void writeSimilarity(final Context context, final String idPivot, final String id, final double d) throws IOException, InterruptedException {
|
220 |
|
final byte[] rowKey = Bytes.toBytes(idPivot);
|
221 |
|
final byte[] target = Bytes.toBytes(id);
|
222 |
|
|
223 |
|
//log.info("writing similarity: " + idPivot + " <-> " + id);
|
224 |
|
|
225 |
|
emitRel(context, rowKey, target, d);
|
226 |
|
emitRel(context, target, rowKey, d);
|
227 |
|
}
|
228 |
|
|
229 |
|
private void emitRel(final Context context, final byte[] from, final byte[] to, final double d) throws IOException, InterruptedException {
|
230 |
|
final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
|
231 |
|
|
232 |
|
//final OafRel.Builder rel = DedupUtils.getDedupSimilarity(dedupConf, new String(from), new String(to));
|
233 |
|
//final Oaf.Builder oaf = DedupUtils.buildRel(dedupConf, rel, d);
|
234 |
|
|
235 |
|
//final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, oaf.build().toByteArray());
|
236 |
|
final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes(""));
|
237 |
|
put.setDurability(Durability.SKIP_WAL);
|
238 |
|
ibw.set(from);
|
239 |
|
context.write(ibw, put);
|
240 |
|
}
|
241 |
91 |
}
|
introduced use of BlockProcessor