1
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.*;
|
5
|
|
6
|
import com.google.common.collect.Lists;
|
7
|
import eu.dnetlib.data.mapreduce.JobParams;
|
8
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
9
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
10
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
11
|
import eu.dnetlib.pace.clustering.NGramUtils;
|
12
|
import eu.dnetlib.pace.config.DedupConfig;
|
13
|
import eu.dnetlib.pace.config.WfConfig;
|
14
|
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
15
|
import eu.dnetlib.pace.distance.eval.ScoreResult;
|
16
|
import eu.dnetlib.pace.model.Field;
|
17
|
import eu.dnetlib.pace.model.MapDocument;
|
18
|
import eu.dnetlib.pace.model.MapDocumentComparator;
|
19
|
import eu.dnetlib.pace.model.MapDocumentSerializer;
|
20
|
import org.apache.commons.lang.StringUtils;
|
21
|
import org.apache.commons.logging.Log;
|
22
|
import org.apache.commons.logging.LogFactory;
|
23
|
import org.apache.hadoop.hbase.client.Durability;
|
24
|
import org.apache.hadoop.hbase.client.Put;
|
25
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
26
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
27
|
import org.apache.hadoop.hbase.util.Bytes;
|
28
|
import org.apache.hadoop.io.Text;
|
29
|
|
30
|
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
31
|
|
32
|
private static final Log log = LogFactory.getLog(DedupReducer.class);
|
33
|
|
34
|
private DedupConfig dedupConf;
|
35
|
|
36
|
private ImmutableBytesWritable ibw;
|
37
|
|
38
|
@Override
|
39
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
40
|
|
41
|
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
|
42
|
ibw = new ImmutableBytesWritable();
|
43
|
|
44
|
log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
|
45
|
}
|
46
|
|
47
|
@Override
|
48
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
49
|
|
50
|
final Queue<MapDocument> q = prepare(context, key, values);
|
51
|
|
52
|
if (q.size() > 1) {
|
53
|
log.info("reducing key: '" + key + "' records: " + q.size());
|
54
|
|
55
|
final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
|
56
|
|
57
|
switch (type) {
|
58
|
case result:
|
59
|
process(simplifyQueue(q, key.toString(), context), context);
|
60
|
break;
|
61
|
case organization:
|
62
|
process(q, context);
|
63
|
break;
|
64
|
default:
|
65
|
throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
|
66
|
}
|
67
|
} else {
|
68
|
context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
|
69
|
}
|
70
|
}
|
71
|
|
72
|
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
73
|
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
|
74
|
|
75
|
final Set<String> seen = new HashSet<String>();
|
76
|
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
|
77
|
|
78
|
boolean logged = false;
|
79
|
|
80
|
for (final ImmutableBytesWritable i : values) {
|
81
|
if (queue.size() <= queueMaxSize) {
|
82
|
final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
83
|
final String id = doc.getIdentifier();
|
84
|
|
85
|
if (!seen.contains(id)) {
|
86
|
seen.add(id);
|
87
|
queue.add(doc);
|
88
|
}
|
89
|
|
90
|
} else {
|
91
|
if (!logged) {
|
92
|
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
|
93
|
context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
|
94
|
log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
|
95
|
logged = true;
|
96
|
}
|
97
|
}
|
98
|
}
|
99
|
|
100
|
return queue;
|
101
|
}
|
102
|
|
103
|
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
104
|
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
105
|
|
106
|
String fieldRef = "";
|
107
|
final List<MapDocument> tempResults = Lists.newArrayList();
|
108
|
|
109
|
while (!queue.isEmpty()) {
|
110
|
final MapDocument result = queue.remove();
|
111
|
|
112
|
final String orderFieldName = dedupConf.getWf().getOrderField();
|
113
|
final Field orderFieldValue = result.values(orderFieldName);
|
114
|
if (!orderFieldValue.isEmpty()) {
|
115
|
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
|
116
|
if (field.equals(fieldRef)) {
|
117
|
tempResults.add(result);
|
118
|
} else {
|
119
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
120
|
tempResults.clear();
|
121
|
tempResults.add(result);
|
122
|
fieldRef = field;
|
123
|
}
|
124
|
} else {
|
125
|
context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
|
126
|
}
|
127
|
}
|
128
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
129
|
|
130
|
return q;
|
131
|
}
|
132
|
|
133
|
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
134
|
final List<MapDocument> tempResults,
|
135
|
final Context context,
|
136
|
final String fieldRef,
|
137
|
final String ngram) {
|
138
|
final WfConfig wfConf = dedupConf.getWf();
|
139
|
if (tempResults.size() < wfConf.getGroupMaxSize()) {
|
140
|
q.addAll(tempResults);
|
141
|
} else {
|
142
|
context.getCounter(wfConf.getEntityType(),
|
143
|
"Skipped records for count(" + wfConf.getOrderField() + ") >= " + wfConf.getGroupMaxSize())
|
144
|
.increment(tempResults.size());
|
145
|
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
146
|
}
|
147
|
}
|
148
|
|
149
|
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
|
150
|
|
151
|
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
152
|
|
153
|
while (!queue.isEmpty()) {
|
154
|
|
155
|
final MapDocument pivot = queue.remove();
|
156
|
final String idPivot = pivot.getIdentifier();
|
157
|
|
158
|
final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
|
159
|
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
160
|
|
161
|
if (fieldPivot != null) {
|
162
|
// System.out.println(idPivot + " --> " + fieldPivot);
|
163
|
|
164
|
int i = 0;
|
165
|
for (final MapDocument curr : queue) {
|
166
|
final String idCurr = curr.getIdentifier();
|
167
|
|
168
|
if (mustSkip(idCurr)) {
|
169
|
context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
|
170
|
break;
|
171
|
}
|
172
|
|
173
|
if (i > dedupConf.getWf().getSlidingWindowSize()) {
|
174
|
break;
|
175
|
}
|
176
|
|
177
|
final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
|
178
|
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
179
|
|
180
|
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
|
181
|
|
182
|
final ScoreResult sr = similarity(algo, pivot, curr);
|
183
|
emitOutput(sr, idPivot, idCurr, context);
|
184
|
i++;
|
185
|
}
|
186
|
}
|
187
|
}
|
188
|
}
|
189
|
}
|
190
|
|
191
|
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
|
192
|
final double d = sr.getScore();
|
193
|
|
194
|
if (d >= dedupConf.getWf().getThreshold()) {
|
195
|
writeSimilarity(context, idPivot, idCurr, d);
|
196
|
context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
|
197
|
} else {
|
198
|
context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
|
199
|
}
|
200
|
}
|
201
|
|
202
|
private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
|
203
|
try {
|
204
|
return algo.between(a, b, dedupConf);
|
205
|
} catch(Throwable e) {
|
206
|
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
|
207
|
throw new IllegalArgumentException(e);
|
208
|
}
|
209
|
}
|
210
|
|
211
|
private boolean mustSkip(final String idPivot) {
|
212
|
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
|
213
|
}
|
214
|
|
215
|
private String getNsPrefix(final String id) {
|
216
|
return StringUtils.substringBetween(id, "|", "::");
|
217
|
}
|
218
|
|
219
|
private void writeSimilarity(final Context context, final String idPivot, final String id, final double d) throws IOException, InterruptedException {
|
220
|
final byte[] rowKey = Bytes.toBytes(idPivot);
|
221
|
final byte[] target = Bytes.toBytes(id);
|
222
|
|
223
|
//log.info("writing similarity: " + idPivot + " <-> " + id);
|
224
|
|
225
|
emitRel(context, rowKey, target, d);
|
226
|
emitRel(context, target, rowKey, d);
|
227
|
}
|
228
|
|
229
|
private void emitRel(final Context context, final byte[] from, final byte[] to, final double d) throws IOException, InterruptedException {
|
230
|
final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
|
231
|
|
232
|
//final OafRel.Builder rel = DedupUtils.getDedupSimilarity(dedupConf, new String(from), new String(to));
|
233
|
//final Oaf.Builder oaf = DedupUtils.buildRel(dedupConf, rel, d);
|
234
|
|
235
|
//final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, oaf.build().toByteArray());
|
236
|
final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes(""));
|
237
|
put.setDurability(Durability.SKIP_WAL);
|
238
|
ibw.set(from);
|
239
|
context.write(ibw, put);
|
240
|
}
|
241
|
}
|