1
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.HashSet;
|
5
|
import java.util.LinkedList;
|
6
|
import java.util.List;
|
7
|
import java.util.PriorityQueue;
|
8
|
import java.util.Queue;
|
9
|
import java.util.Set;
|
10
|
|
11
|
import org.apache.commons.lang.StringUtils;
|
12
|
import org.apache.hadoop.hbase.client.Put;
|
13
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
14
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
15
|
import org.apache.hadoop.hbase.util.Bytes;
|
16
|
import org.apache.hadoop.io.Text;
|
17
|
|
18
|
import com.google.common.collect.Lists;
|
19
|
|
20
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
21
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
22
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
23
|
import eu.dnetlib.pace.clustering.NGramUtils;
|
24
|
import eu.dnetlib.pace.config.Config;
|
25
|
import eu.dnetlib.pace.config.DynConf;
|
26
|
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
27
|
import eu.dnetlib.pace.model.FieldList;
|
28
|
import eu.dnetlib.pace.model.MapDocument;
|
29
|
import eu.dnetlib.pace.model.MapDocumentComparator;
|
30
|
import eu.dnetlib.pace.model.MapDocumentSerializer;
|
31
|
import eu.dnetlib.pace.util.DedupConfig;
|
32
|
import eu.dnetlib.pace.util.DedupConfigLoader;
|
33
|
|
34
|
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
35
|
|
36
|
private static final boolean WRITE_TO_WAL = false;
|
37
|
// private static final int LIMIT = 2000;
|
38
|
// private static final int FIELD_LIMIT = 10;
|
39
|
// private static final int WINDOW_SIZE = 200;
|
40
|
|
41
|
private Config paceConf;
|
42
|
private DedupConfig dedupConf;
|
43
|
|
44
|
private ImmutableBytesWritable ibw;
|
45
|
|
46
|
@Override
|
47
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
48
|
paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
|
49
|
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
50
|
ibw = new ImmutableBytesWritable();
|
51
|
|
52
|
System.out.println("dedup reduce phase \npace conf: " + paceConf.fields() + "\nwf conf: " + dedupConf.toString());
|
53
|
}
|
54
|
|
55
|
@Override
|
56
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
57
|
System.out.println("\nReducing key: '" + key + "'");
|
58
|
|
59
|
final Queue<MapDocument> q = prepare(context, key, values);
|
60
|
switch (Type.valueOf(dedupConf.getEntityType())) {
|
61
|
case person:
|
62
|
process(q, context);
|
63
|
break;
|
64
|
case result:
|
65
|
process(simplifyQueue(q, key.toString(), context), context);
|
66
|
break;
|
67
|
case organization:
|
68
|
process(q, context);
|
69
|
break;
|
70
|
default:
|
71
|
throw new IllegalArgumentException("dedup not implemented for type: " + dedupConf.getEntityType());
|
72
|
}
|
73
|
}
|
74
|
|
75
|
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
76
|
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
|
77
|
|
78
|
final Set<String> seen = new HashSet<String>();
|
79
|
|
80
|
for (ImmutableBytesWritable i : values) {
|
81
|
MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
82
|
String id = doc.getIdentifier();
|
83
|
|
84
|
if (!seen.contains(id)) {
|
85
|
seen.add(id);
|
86
|
queue.add(doc);
|
87
|
}
|
88
|
|
89
|
if (queue.size() > dedupConf.getQueueMaxSize()) {
|
90
|
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
|
91
|
context.getCounter("ngram size > " + dedupConf.getQueueMaxSize(), "N").increment(1);
|
92
|
System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key);
|
93
|
break;
|
94
|
}
|
95
|
}
|
96
|
|
97
|
return queue;
|
98
|
}
|
99
|
|
100
|
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
101
|
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
102
|
|
103
|
String fieldRef = "";
|
104
|
List<MapDocument> tempResults = Lists.newArrayList();
|
105
|
|
106
|
while (!queue.isEmpty()) {
|
107
|
MapDocument result = queue.remove();
|
108
|
|
109
|
if (!result.values(dedupConf.getOrderField()).isEmpty()) {
|
110
|
String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue());
|
111
|
if (field.equals(fieldRef)) {
|
112
|
tempResults.add(result);
|
113
|
} else {
|
114
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
115
|
tempResults.clear();
|
116
|
tempResults.add(result);
|
117
|
fieldRef = field;
|
118
|
}
|
119
|
} else {
|
120
|
context.getCounter(dedupConf.getEntityType(), "missing " + dedupConf.getOrderField()).increment(1);
|
121
|
}
|
122
|
}
|
123
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
124
|
|
125
|
return q;
|
126
|
}
|
127
|
|
128
|
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
129
|
final List<MapDocument> tempResults,
|
130
|
final Context context,
|
131
|
final String fieldRef,
|
132
|
final String ngram) {
|
133
|
if (tempResults.size() < dedupConf.getGroupMaxSize()) {
|
134
|
q.addAll(tempResults);
|
135
|
} else {
|
136
|
context.getCounter(dedupConf.getEntityType(), "Skipped records for count(" + dedupConf.getOrderField() + ") >= " + dedupConf.getGroupMaxSize())
|
137
|
.increment(tempResults.size());
|
138
|
System.out.println("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
139
|
}
|
140
|
}
|
141
|
|
142
|
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
|
143
|
|
144
|
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
145
|
|
146
|
while (!queue.isEmpty()) {
|
147
|
|
148
|
final MapDocument pivot = queue.remove();
|
149
|
final String idPivot = pivot.getIdentifier();
|
150
|
|
151
|
final FieldList fieldsPivot = pivot.values(dedupConf.getOrderField());
|
152
|
final String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
153
|
|
154
|
if (fieldPivot != null) {
|
155
|
// System.out.println(idPivot + " --> " + fieldPivot);
|
156
|
|
157
|
int i = 0;
|
158
|
for (MapDocument curr : queue) {
|
159
|
final String idCurr = curr.getIdentifier();
|
160
|
|
161
|
if (mustSkip(idCurr)) {
|
162
|
context.getCounter(dedupConf.getEntityType(), "skip list").increment(1);
|
163
|
break;
|
164
|
}
|
165
|
|
166
|
if (i > dedupConf.getSlidingWindowSize()) {
|
167
|
break;
|
168
|
}
|
169
|
|
170
|
final FieldList fieldsCurr = curr.values(dedupConf.getOrderField());
|
171
|
final String fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
172
|
|
173
|
if (!idCurr.equals(idPivot) && fieldCurr != null) {
|
174
|
|
175
|
double d = algo.between(pivot, curr, paceConf);
|
176
|
|
177
|
if (d >= dedupConf.getThreshold()) {
|
178
|
writeSimilarity(context, idPivot, idCurr);
|
179
|
context.getCounter(dedupConf.getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
|
180
|
} else {
|
181
|
context.getCounter(dedupConf.getEntityType(), "d < " + dedupConf.getThreshold()).increment(1);
|
182
|
}
|
183
|
i++;
|
184
|
}
|
185
|
}
|
186
|
}
|
187
|
}
|
188
|
}
|
189
|
|
190
|
private boolean mustSkip(final String idPivot) {
|
191
|
return dedupConf.getSkipList().contains(getNsPrefix(idPivot));
|
192
|
}
|
193
|
|
194
|
private String getNsPrefix(final String id) {
|
195
|
return StringUtils.substringBetween(id, "|", "::");
|
196
|
}
|
197
|
|
198
|
private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
|
199
|
byte[] rowKey = Bytes.toBytes(idPivot);
|
200
|
byte[] target = Bytes.toBytes(id);
|
201
|
|
202
|
emitRel(context, rowKey, target);
|
203
|
emitRel(context, target, rowKey);
|
204
|
}
|
205
|
|
206
|
private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
|
207
|
Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes(""));
|
208
|
put.setWriteToWAL(WRITE_TO_WAL);
|
209
|
ibw.set(from);
|
210
|
context.write(ibw, put);
|
211
|
}
|
212
|
}
|