1 |
26600
|
sandro.lab
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2 |
|
|
|
3 |
|
|
import java.io.IOException;
|
4 |
|
|
import java.util.HashSet;
|
5 |
|
|
import java.util.LinkedList;
|
6 |
|
|
import java.util.List;
|
7 |
|
|
import java.util.PriorityQueue;
|
8 |
|
|
import java.util.Queue;
|
9 |
|
|
import java.util.Set;
|
10 |
|
|
|
11 |
|
|
import org.apache.commons.lang.StringUtils;
|
12 |
|
|
import org.apache.hadoop.hbase.client.Put;
|
13 |
|
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
14 |
|
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
15 |
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
16 |
|
|
import org.apache.hadoop.io.Text;
|
17 |
|
|
|
18 |
|
|
import com.google.common.collect.Lists;
|
19 |
|
|
|
20 |
28094
|
claudio.at
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
21 |
|
|
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
|
22 |
28411
|
claudio.at
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
23 |
26600
|
sandro.lab
|
import eu.dnetlib.pace.clustering.NGramUtils;
|
24 |
|
|
import eu.dnetlib.pace.config.Config;
|
25 |
|
|
import eu.dnetlib.pace.config.DynConf;
|
26 |
|
|
import eu.dnetlib.pace.distance.PaceDocumentDistance;
|
27 |
33137
|
claudio.at
|
import eu.dnetlib.pace.model.FieldList;
|
28 |
26600
|
sandro.lab
|
import eu.dnetlib.pace.model.MapDocument;
|
29 |
|
|
import eu.dnetlib.pace.model.MapDocumentComparator;
|
30 |
|
|
import eu.dnetlib.pace.model.MapDocumentSerializer;
|
31 |
|
|
import eu.dnetlib.pace.util.DedupConfig;
|
32 |
|
|
import eu.dnetlib.pace.util.DedupConfigLoader;
|
33 |
|
|
|
34 |
|
|
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
35 |
|
|
|
36 |
|
|
private static final boolean WRITE_TO_WAL = false;
|
37 |
28981
|
claudio.at
|
// private static final int LIMIT = 2000;
|
38 |
|
|
// private static final int FIELD_LIMIT = 10;
|
39 |
|
|
// private static final int WINDOW_SIZE = 200;
|
40 |
26600
|
sandro.lab
|
|
41 |
|
|
private Config paceConf;
|
42 |
|
|
private DedupConfig dedupConf;
|
43 |
|
|
|
44 |
|
|
private ImmutableBytesWritable ibw;
|
45 |
|
|
|
46 |
|
|
@Override
|
47 |
28411
|
claudio.at
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
48 |
26600
|
sandro.lab
|
paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
|
49 |
|
|
dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
|
50 |
|
|
ibw = new ImmutableBytesWritable();
|
51 |
|
|
|
52 |
|
|
System.out.println("dedup reduce phase \npace conf: " + paceConf.fields() + "\nwf conf: " + dedupConf.toString());
|
53 |
|
|
}
|
54 |
|
|
|
55 |
|
|
@Override
|
56 |
28411
|
claudio.at
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
57 |
26600
|
sandro.lab
|
System.out.println("\nReducing key: '" + key + "'");
|
58 |
|
|
|
59 |
|
|
final Queue<MapDocument> q = prepare(context, key, values);
|
60 |
28411
|
claudio.at
|
switch (Type.valueOf(dedupConf.getEntityType())) {
|
61 |
26600
|
sandro.lab
|
case person:
|
62 |
|
|
process(q, context);
|
63 |
|
|
break;
|
64 |
|
|
case result:
|
65 |
|
|
process(simplifyQueue(q, key.toString(), context), context);
|
66 |
|
|
break;
|
67 |
|
|
case organization:
|
68 |
|
|
process(q, context);
|
69 |
|
|
break;
|
70 |
|
|
default:
|
71 |
28411
|
claudio.at
|
throw new IllegalArgumentException("dedup not implemented for type: " + dedupConf.getEntityType());
|
72 |
26600
|
sandro.lab
|
}
|
73 |
|
|
}
|
74 |
|
|
|
75 |
28411
|
claudio.at
|
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
|
76 |
26600
|
sandro.lab
|
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
|
77 |
|
|
|
78 |
|
|
final Set<String> seen = new HashSet<String>();
|
79 |
|
|
|
80 |
|
|
for (ImmutableBytesWritable i : values) {
|
81 |
|
|
MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
|
82 |
|
|
String id = doc.getIdentifier();
|
83 |
|
|
|
84 |
|
|
if (!seen.contains(id)) {
|
85 |
|
|
seen.add(id);
|
86 |
|
|
queue.add(doc);
|
87 |
|
|
}
|
88 |
|
|
|
89 |
28981
|
claudio.at
|
if (queue.size() > dedupConf.getQueueMaxSize()) {
|
90 |
26600
|
sandro.lab
|
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
|
91 |
28981
|
claudio.at
|
context.getCounter("ngram size > " + dedupConf.getQueueMaxSize(), "N").increment(1);
|
92 |
|
|
System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key);
|
93 |
26600
|
sandro.lab
|
break;
|
94 |
|
|
}
|
95 |
|
|
}
|
96 |
|
|
|
97 |
|
|
return queue;
|
98 |
|
|
}
|
99 |
|
|
|
100 |
28411
|
claudio.at
|
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
|
101 |
26600
|
sandro.lab
|
final Queue<MapDocument> q = new LinkedList<MapDocument>();
|
102 |
|
|
|
103 |
|
|
String fieldRef = "";
|
104 |
|
|
List<MapDocument> tempResults = Lists.newArrayList();
|
105 |
|
|
|
106 |
|
|
while (!queue.isEmpty()) {
|
107 |
|
|
MapDocument result = queue.remove();
|
108 |
|
|
|
109 |
|
|
if (!result.values(dedupConf.getOrderField()).isEmpty()) {
|
110 |
33137
|
claudio.at
|
String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue());
|
111 |
26600
|
sandro.lab
|
if (field.equals(fieldRef)) {
|
112 |
|
|
tempResults.add(result);
|
113 |
|
|
} else {
|
114 |
|
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
115 |
|
|
tempResults.clear();
|
116 |
|
|
tempResults.add(result);
|
117 |
|
|
fieldRef = field;
|
118 |
|
|
}
|
119 |
|
|
} else {
|
120 |
28411
|
claudio.at
|
context.getCounter(dedupConf.getEntityType(), "missing " + dedupConf.getOrderField()).increment(1);
|
121 |
26600
|
sandro.lab
|
}
|
122 |
|
|
}
|
123 |
|
|
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
|
124 |
|
|
|
125 |
|
|
return q;
|
126 |
|
|
}
|
127 |
|
|
|
128 |
28411
|
claudio.at
|
private void populateSimplifiedQueue(final Queue<MapDocument> q,
|
129 |
|
|
final List<MapDocument> tempResults,
|
130 |
|
|
final Context context,
|
131 |
|
|
final String fieldRef,
|
132 |
|
|
final String ngram) {
|
133 |
28981
|
claudio.at
|
if (tempResults.size() < dedupConf.getGroupMaxSize()) {
|
134 |
26600
|
sandro.lab
|
q.addAll(tempResults);
|
135 |
|
|
} else {
|
136 |
28981
|
claudio.at
|
context.getCounter(dedupConf.getEntityType(), "Skipped records for count(" + dedupConf.getOrderField() + ") >= " + dedupConf.getGroupMaxSize())
|
137 |
33137
|
claudio.at
|
.increment(tempResults.size());
|
138 |
26600
|
sandro.lab
|
System.out.println("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
|
139 |
|
|
}
|
140 |
|
|
}
|
141 |
|
|
|
142 |
28411
|
claudio.at
|
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
|
143 |
26600
|
sandro.lab
|
|
144 |
|
|
final PaceDocumentDistance algo = new PaceDocumentDistance();
|
145 |
|
|
|
146 |
|
|
while (!queue.isEmpty()) {
|
147 |
|
|
|
148 |
|
|
final MapDocument pivot = queue.remove();
|
149 |
|
|
final String idPivot = pivot.getIdentifier();
|
150 |
|
|
|
151 |
33137
|
claudio.at
|
final FieldList fieldsPivot = pivot.values(dedupConf.getOrderField());
|
152 |
|
|
final String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
|
153 |
26600
|
sandro.lab
|
|
154 |
|
|
if (fieldPivot != null) {
|
155 |
|
|
// System.out.println(idPivot + " --> " + fieldPivot);
|
156 |
|
|
|
157 |
|
|
int i = 0;
|
158 |
|
|
for (MapDocument curr : queue) {
|
159 |
|
|
final String idCurr = curr.getIdentifier();
|
160 |
|
|
|
161 |
|
|
if (mustSkip(idCurr)) {
|
162 |
28411
|
claudio.at
|
context.getCounter(dedupConf.getEntityType(), "skip list").increment(1);
|
163 |
26600
|
sandro.lab
|
break;
|
164 |
|
|
}
|
165 |
|
|
|
166 |
28981
|
claudio.at
|
if (i > dedupConf.getSlidingWindowSize()) {
|
167 |
26600
|
sandro.lab
|
break;
|
168 |
|
|
}
|
169 |
|
|
|
170 |
33137
|
claudio.at
|
final FieldList fieldsCurr = curr.values(dedupConf.getOrderField());
|
171 |
|
|
final String fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
|
172 |
26600
|
sandro.lab
|
|
173 |
33137
|
claudio.at
|
if (!idCurr.equals(idPivot) && fieldCurr != null) {
|
174 |
26600
|
sandro.lab
|
|
175 |
|
|
double d = algo.between(pivot, curr, paceConf);
|
176 |
|
|
|
177 |
|
|
if (d >= dedupConf.getThreshold()) {
|
178 |
|
|
writeSimilarity(context, idPivot, idCurr);
|
179 |
28411
|
claudio.at
|
context.getCounter(dedupConf.getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
|
180 |
26600
|
sandro.lab
|
} else {
|
181 |
28411
|
claudio.at
|
context.getCounter(dedupConf.getEntityType(), "d < " + dedupConf.getThreshold()).increment(1);
|
182 |
26600
|
sandro.lab
|
}
|
183 |
|
|
i++;
|
184 |
|
|
}
|
185 |
|
|
}
|
186 |
|
|
}
|
187 |
|
|
}
|
188 |
|
|
}
|
189 |
|
|
|
190 |
|
|
private boolean mustSkip(final String idPivot) {
|
191 |
|
|
return dedupConf.getSkipList().contains(getNsPrefix(idPivot));
|
192 |
|
|
}
|
193 |
|
|
|
194 |
28411
|
claudio.at
|
private String getNsPrefix(final String id) {
|
195 |
26600
|
sandro.lab
|
return StringUtils.substringBetween(id, "|", "::");
|
196 |
|
|
}
|
197 |
|
|
|
198 |
28411
|
claudio.at
|
private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
|
199 |
26600
|
sandro.lab
|
byte[] rowKey = Bytes.toBytes(idPivot);
|
200 |
|
|
byte[] target = Bytes.toBytes(id);
|
201 |
|
|
|
202 |
|
|
emitRel(context, rowKey, target);
|
203 |
|
|
emitRel(context, target, rowKey);
|
204 |
|
|
}
|
205 |
|
|
|
206 |
28411
|
claudio.at
|
private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
|
207 |
|
|
Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes(""));
|
208 |
26600
|
sandro.lab
|
put.setWriteToWAL(WRITE_TO_WAL);
|
209 |
|
|
ibw.set(from);
|
210 |
|
|
context.write(ibw, put);
|
211 |
|
|
}
|
212 |
|
|
}
|