1
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.*;
|
5
|
|
6
|
import com.google.common.collect.Lists;
|
7
|
import eu.dnetlib.data.mapreduce.JobParams;
|
8
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
9
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
10
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
11
|
import eu.dnetlib.pace.clustering.NGramUtils;
|
12
|
import eu.dnetlib.pace.config.DedupConfig;
|
13
|
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
14
|
import eu.dnetlib.pace.distance.eval.ScoreResult;
|
15
|
import eu.dnetlib.pace.model.Field;
|
16
|
import eu.dnetlib.pace.model.MapDocument;
|
17
|
import eu.dnetlib.pace.model.MapDocumentComparator;
|
18
|
import eu.dnetlib.pace.model.MapDocumentSerializer;
|
19
|
import org.apache.commons.lang.StringUtils;
|
20
|
import org.apache.commons.logging.Log;
|
21
|
import org.apache.commons.logging.LogFactory;
|
22
|
import org.apache.hadoop.hbase.client.Put;
|
23
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
24
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
25
|
import org.apache.hadoop.hbase.util.Bytes;
|
26
|
import org.apache.hadoop.io.Text;
|
27
|
|
28
|
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
29
|
|
30
|
private static final Log log = LogFactory.getLog(DedupReducer.class);
|
31
|
|
32
|
private DedupConfig dedupConf;
|
33
|
|
34
|
private ImmutableBytesWritable ibw;
|
35
|
|
36
|
@Override
|
37
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
38
|
|
39
|
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
|
40
|
ibw = new ImmutableBytesWritable();
|
41
|
|
42
|
log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
|
43
|
}
|
44
|
|
45
|
@Override
|
46
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
47
|
|
48
|
final Queue<MapDocument> q = prepare(context, key, values);
|
49
|
|
50
|
if (q.size() > 1) {
|
51
|
log.info("reducing key: '" + key + "' records: " + q.size());
|
52
|
|
53
|
switch (Type.valueOf(dedupConf.getWf().getEntityType())) {
|
54
|
case result:
|
55
|
process(simplifyQueue(q, key.toString(), context), context);
|
56
|
break;
|
57
|
case organization:
|
58
|
process(q, context);
|
59
|
break;
|
60
|
default:
|
61
|
throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
|
62
|
}
|
63
|
} else {
|
64
|
context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
|
65
|
}
|
66
|
}
|
67
|
|
68
|
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
69
|
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
|
70
|
|
71
|
final Set<String> seen = new HashSet<String>();
|
72
|
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
|
73
|
|
74
|
boolean logged = false;
|
75
|
|
76
|
for (final ImmutableBytesWritable i : values) {
|
77
|
if (queue.size() <= queueMaxSize) {
|
78
|
final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
79
|
final String id = doc.getIdentifier();
|
80
|
|
81
|
if (!seen.contains(id)) {
|
82
|
seen.add(id);
|
83
|
queue.add(doc);
|
84
|
}
|
85
|
|
86
|
} else {
|
87
|
if (!logged) {
|
88
|
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
|
89
|
context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
|
90
|
log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
|
91
|
logged = true;
|
92
|
}
|
93
|
}
|
94
|
}
|
95
|
|
96
|
return queue;
|
97
|
}
|
98
|
|
99
|
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
100
|
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
101
|
|
102
|
String fieldRef = "";
|
103
|
final List<MapDocument> tempResults = Lists.newArrayList();
|
104
|
|
105
|
while (!queue.isEmpty()) {
|
106
|
final MapDocument result = queue.remove();
|
107
|
|
108
|
final String orderFieldName = dedupConf.getWf().getOrderField();
|
109
|
final Field orderFieldValue = result.values(orderFieldName);
|
110
|
if (!orderFieldValue.isEmpty()) {
|
111
|
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
|
112
|
if (field.equals(fieldRef)) {
|
113
|
tempResults.add(result);
|
114
|
} else {
|
115
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
116
|
tempResults.clear();
|
117
|
tempResults.add(result);
|
118
|
fieldRef = field;
|
119
|
}
|
120
|
} else {
|
121
|
context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
|
122
|
}
|
123
|
}
|
124
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
125
|
|
126
|
return q;
|
127
|
}
|
128
|
|
129
|
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
130
|
final List<MapDocument> tempResults,
|
131
|
final Context context,
|
132
|
final String fieldRef,
|
133
|
final String ngram) {
|
134
|
if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) {
|
135
|
q.addAll(tempResults);
|
136
|
} else {
|
137
|
context.getCounter(dedupConf.getWf().getEntityType(),
|
138
|
"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
|
139
|
.increment(tempResults.size());
|
140
|
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
141
|
}
|
142
|
}
|
143
|
|
144
|
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
|
145
|
|
146
|
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
147
|
|
148
|
while (!queue.isEmpty()) {
|
149
|
|
150
|
final MapDocument pivot = queue.remove();
|
151
|
final String idPivot = pivot.getIdentifier();
|
152
|
|
153
|
final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
|
154
|
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
155
|
|
156
|
if (fieldPivot != null) {
|
157
|
// System.out.println(idPivot + " --> " + fieldPivot);
|
158
|
|
159
|
int i = 0;
|
160
|
for (final MapDocument curr : queue) {
|
161
|
final String idCurr = curr.getIdentifier();
|
162
|
|
163
|
if (mustSkip(idCurr)) {
|
164
|
context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
|
165
|
break;
|
166
|
}
|
167
|
|
168
|
if (i > dedupConf.getWf().getSlidingWindowSize()) {
|
169
|
break;
|
170
|
}
|
171
|
|
172
|
final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
|
173
|
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
174
|
|
175
|
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
|
176
|
|
177
|
final ScoreResult sr = similarity(algo, pivot, curr);
|
178
|
emitOutput(sr, idPivot, idCurr, context);
|
179
|
i++;
|
180
|
}
|
181
|
}
|
182
|
}
|
183
|
}
|
184
|
}
|
185
|
|
186
|
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
|
187
|
final double d = sr.getScore();
|
188
|
|
189
|
if (d >= dedupConf.getWf().getThreshold()) {
|
190
|
writeSimilarity(context, idPivot, idCurr, d);
|
191
|
context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
|
192
|
} else {
|
193
|
context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
|
194
|
}
|
195
|
}
|
196
|
|
197
|
private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
|
198
|
try {
|
199
|
return algo.between(a, b, dedupConf);
|
200
|
} catch(Throwable e) {
|
201
|
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
|
202
|
throw new IllegalArgumentException(e);
|
203
|
}
|
204
|
}
|
205
|
|
206
|
private boolean mustSkip(final String idPivot) {
|
207
|
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
|
208
|
}
|
209
|
|
210
|
private String getNsPrefix(final String id) {
|
211
|
return StringUtils.substringBetween(id, "|", "::");
|
212
|
}
|
213
|
|
214
|
private void writeSimilarity(final Context context, final String idPivot, final String id, final double d) throws IOException, InterruptedException {
|
215
|
final byte[] rowKey = Bytes.toBytes(idPivot);
|
216
|
final byte[] target = Bytes.toBytes(id);
|
217
|
|
218
|
//log.info("writing similarity: " + idPivot + " <-> " + id);
|
219
|
|
220
|
emitRel(context, rowKey, target, d);
|
221
|
emitRel(context, target, rowKey, d);
|
222
|
}
|
223
|
|
224
|
private void emitRel(final Context context, final byte[] from, final byte[] to, final double d) throws IOException, InterruptedException {
|
225
|
final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
|
226
|
|
227
|
//final OafRel.Builder rel = DedupUtils.getDedupSimilarity(dedupConf, new String(from), new String(to));
|
228
|
//final Oaf.Builder oaf = DedupUtils.buildRel(dedupConf, rel, d);
|
229
|
|
230
|
//final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, oaf.build().toByteArray());
|
231
|
final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes(""));
|
232
|
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
233
|
ibw.set(from);
|
234
|
context.write(ibw, put);
|
235
|
}
|
236
|
}
|