1
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.*;
|
5
|
|
6
|
import com.google.common.collect.Lists;
|
7
|
import eu.dnetlib.data.mapreduce.JobParams;
|
8
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
9
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
10
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
11
|
import eu.dnetlib.pace.clustering.NGramUtils;
|
12
|
import eu.dnetlib.pace.config.DedupConfig;
|
13
|
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
14
|
import eu.dnetlib.pace.model.Field;
|
15
|
import eu.dnetlib.pace.model.MapDocument;
|
16
|
import eu.dnetlib.pace.model.MapDocumentComparator;
|
17
|
import eu.dnetlib.pace.model.MapDocumentSerializer;
|
18
|
import org.apache.commons.lang.StringUtils;
|
19
|
import org.apache.commons.logging.Log;
|
20
|
import org.apache.commons.logging.LogFactory;
|
21
|
import org.apache.hadoop.hbase.client.Put;
|
22
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
23
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
24
|
import org.apache.hadoop.hbase.util.Bytes;
|
25
|
import org.apache.hadoop.io.Text;
|
26
|
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
|
27
|
|
28
|
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
29
|
|
30
|
private static final Log log = LogFactory.getLog(DedupReducer.class);
|
31
|
|
32
|
private static final boolean WRITE_TO_WAL = false;
|
33
|
|
34
|
private DedupConfig dedupConf;
|
35
|
|
36
|
private ImmutableBytesWritable ibw;
|
37
|
|
38
|
@Override
|
39
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
40
|
|
41
|
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
|
42
|
ibw = new ImmutableBytesWritable();
|
43
|
|
44
|
log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
|
45
|
}
|
46
|
|
47
|
@Override
|
48
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
49
|
|
50
|
final Queue<MapDocument> q = prepare(context, key, values);
|
51
|
|
52
|
if (q.size() > 1) {
|
53
|
log.info("reducing key: '" + key + "' records: " + q.size());
|
54
|
|
55
|
switch (Type.valueOf(dedupConf.getWf().getEntityType())) {
|
56
|
case person:
|
57
|
process(q, context);
|
58
|
break;
|
59
|
case result:
|
60
|
process(simplifyQueue(q, key.toString(), context), context);
|
61
|
break;
|
62
|
case organization:
|
63
|
process(q, context);
|
64
|
break;
|
65
|
default:
|
66
|
throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
|
67
|
}
|
68
|
} else {
|
69
|
context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
|
70
|
}
|
71
|
}
|
72
|
|
73
|
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
74
|
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
|
75
|
|
76
|
final Set<String> seen = new HashSet<String>();
|
77
|
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
|
78
|
int count = 0;
|
79
|
boolean logged = false;
|
80
|
|
81
|
for (final ImmutableBytesWritable i : values) {
|
82
|
count++;
|
83
|
|
84
|
if (queue.size() <= queueMaxSize) {
|
85
|
final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
86
|
final String id = doc.getIdentifier();
|
87
|
|
88
|
if (!seen.contains(id)) {
|
89
|
seen.add(id);
|
90
|
queue.add(doc);
|
91
|
}
|
92
|
|
93
|
} else {
|
94
|
if (!logged) {
|
95
|
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
|
96
|
context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
|
97
|
log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
|
98
|
logged = true;
|
99
|
}
|
100
|
}
|
101
|
}
|
102
|
|
103
|
log.info(String.format("cluster key '%s' size '%s'", key, count));
|
104
|
|
105
|
return queue;
|
106
|
}
|
107
|
|
108
|
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
109
|
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
110
|
|
111
|
String fieldRef = "";
|
112
|
final List<MapDocument> tempResults = Lists.newArrayList();
|
113
|
|
114
|
while (!queue.isEmpty()) {
|
115
|
final MapDocument result = queue.remove();
|
116
|
|
117
|
final String orderFieldName = dedupConf.getWf().getOrderField();
|
118
|
final Field orderFieldValue = result.values(orderFieldName);
|
119
|
if (!orderFieldValue.isEmpty()) {
|
120
|
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
|
121
|
if (field.equals(fieldRef)) {
|
122
|
tempResults.add(result);
|
123
|
} else {
|
124
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
125
|
tempResults.clear();
|
126
|
tempResults.add(result);
|
127
|
fieldRef = field;
|
128
|
}
|
129
|
} else {
|
130
|
context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
|
131
|
}
|
132
|
}
|
133
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
134
|
|
135
|
return q;
|
136
|
}
|
137
|
|
138
|
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
139
|
final List<MapDocument> tempResults,
|
140
|
final Context context,
|
141
|
final String fieldRef,
|
142
|
final String ngram) {
|
143
|
if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) {
|
144
|
q.addAll(tempResults);
|
145
|
} else {
|
146
|
context.getCounter(dedupConf.getWf().getEntityType(),
|
147
|
"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
|
148
|
.increment(tempResults.size());
|
149
|
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
150
|
}
|
151
|
}
|
152
|
|
153
|
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
|
154
|
|
155
|
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
156
|
|
157
|
while (!queue.isEmpty()) {
|
158
|
|
159
|
final MapDocument pivot = queue.remove();
|
160
|
final String idPivot = pivot.getIdentifier();
|
161
|
|
162
|
final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
|
163
|
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
164
|
|
165
|
if (fieldPivot != null) {
|
166
|
// System.out.println(idPivot + " --> " + fieldPivot);
|
167
|
|
168
|
int i = 0;
|
169
|
for (final MapDocument curr : queue) {
|
170
|
final String idCurr = curr.getIdentifier();
|
171
|
|
172
|
if (mustSkip(idCurr)) {
|
173
|
context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
|
174
|
break;
|
175
|
}
|
176
|
|
177
|
if (i > dedupConf.getWf().getSlidingWindowSize()) {
|
178
|
break;
|
179
|
}
|
180
|
|
181
|
final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
|
182
|
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
183
|
|
184
|
if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
|
185
|
|
186
|
final double d = similarity(algo, pivot, curr);
|
187
|
|
188
|
if (d >= dedupConf.getWf().getThreshold()) {
|
189
|
writeSimilarity(context, idPivot, idCurr);
|
190
|
context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
|
191
|
} else {
|
192
|
context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
|
193
|
}
|
194
|
i++;
|
195
|
}
|
196
|
}
|
197
|
}
|
198
|
}
|
199
|
}
|
200
|
|
201
|
private double similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
|
202
|
try {
|
203
|
return algo.between(a, b, dedupConf);
|
204
|
} catch(Throwable e) {
|
205
|
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
|
206
|
throw new IllegalArgumentException(e);
|
207
|
}
|
208
|
}
|
209
|
|
210
|
private boolean mustSkip(final String idPivot) {
|
211
|
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
|
212
|
}
|
213
|
|
214
|
private String getNsPrefix(final String id) {
|
215
|
return StringUtils.substringBetween(id, "|", "::");
|
216
|
}
|
217
|
|
218
|
private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
|
219
|
final byte[] rowKey = Bytes.toBytes(idPivot);
|
220
|
final byte[] target = Bytes.toBytes(id);
|
221
|
|
222
|
//log.info("writing similarity: " + idPivot + " <-> " + id);
|
223
|
|
224
|
emitRel(context, rowKey, target);
|
225
|
emitRel(context, target, rowKey);
|
226
|
}
|
227
|
|
228
|
private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
|
229
|
final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getWf().getEntityType())), to, Bytes.toBytes(""));
|
230
|
put.setWriteToWAL(WRITE_TO_WAL);
|
231
|
ibw.set(from);
|
232
|
context.write(ibw, put);
|
233
|
}
|
234
|
}
|