Revision 45419
Added by Claudio Atzori over 7 years ago
DedupReducer.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.*; |
|
5 |
// |
|
6 |
//import com.google.common.collect.Lists; |
|
7 |
//import eu.dnetlib.data.mapreduce.JobParams; |
|
8 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
9 |
//import eu.dnetlib.data.proto.RelTypeProtos.SubRelType; |
|
10 |
//import eu.dnetlib.data.proto.TypeProtos.Type; |
|
11 |
//import eu.dnetlib.pace.clustering.NGramUtils; |
|
12 |
//import eu.dnetlib.pace.config.DedupConfig; |
|
13 |
//import eu.dnetlib.pace.distance.PaceDocumentDistance; |
|
14 |
//import eu.dnetlib.pace.distance.eval.ScoreResult; |
|
15 |
//import eu.dnetlib.pace.model.Field; |
|
16 |
//import eu.dnetlib.pace.model.MapDocument; |
|
17 |
//import eu.dnetlib.pace.model.MapDocumentComparator; |
|
18 |
//import eu.dnetlib.pace.model.MapDocumentSerializer; |
|
19 |
//import org.apache.commons.lang3.StringUtils; |
|
20 |
//import org.apache.commons.logging.Log; |
|
21 |
//import org.apache.commons.logging.LogFactory; |
|
22 |
//import org.apache.hadoop.hbase.client.Put; |
|
23 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
24 |
//import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
25 |
//import org.apache.hadoop.hbase.util.Bytes; |
|
26 |
//import org.apache.hadoop.io.Text; |
|
27 |
// |
|
28 |
//public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> { |
|
29 |
// |
|
30 |
// private static final Log log = LogFactory.getLog(DedupReducer.class); |
|
31 |
// |
|
32 |
// private DedupConfig dedupConf; |
|
33 |
// |
|
34 |
// private ImmutableBytesWritable ibw; |
|
35 |
// |
|
36 |
// @Override |
|
37 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
38 |
// |
|
39 |
// dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
40 |
// ibw = new ImmutableBytesWritable(); |
|
41 |
// |
|
42 |
// log.info("dedup reduce phase \npace conf: " + dedupConf.toString()); |
|
43 |
// } |
|
44 |
// |
|
45 |
// @Override |
|
46 |
// protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
|
47 |
// |
|
48 |
// final Queue<MapDocument> q = prepare(context, key, values); |
|
49 |
// |
|
50 |
// if (q.size() > 1) { |
|
51 |
// log.info("reducing key: '" + key + "' records: " + q.size()); |
|
52 |
// |
|
53 |
// switch (Type.valueOf(dedupConf.getWf().getEntityType())) { |
|
54 |
// case person: |
|
55 |
// process(q, context); |
|
56 |
// break; |
|
57 |
// case publication: |
|
58 |
// case dataset: |
|
59 |
// process(simplifyQueue(q, key.toString(), context), context); |
|
60 |
// break; |
|
61 |
// case organization: |
|
62 |
// process(q, context); |
|
63 |
// break; |
|
64 |
// default: |
|
65 |
// throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType()); |
|
66 |
// } |
|
67 |
// } else { |
|
68 |
// context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1); |
|
69 |
// } |
|
70 |
// } |
|
71 |
// |
|
72 |
// private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) { |
|
73 |
// final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField())); |
|
74 |
// |
|
75 |
// final Set<String> seen = new HashSet<String>(); |
|
76 |
// final int queueMaxSize = dedupConf.getWf().getQueueMaxSize(); |
|
77 |
// int count = 0; |
|
78 |
// boolean logged = false; |
|
79 |
// |
|
80 |
// for (final ImmutableBytesWritable i : values) { |
|
81 |
// count++; |
|
82 |
// |
|
83 |
// if (queue.size() <= queueMaxSize) { |
|
84 |
// final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes()); |
|
85 |
// final String id = doc.getIdentifier(); |
|
86 |
// |
|
87 |
// if (!seen.contains(id)) { |
|
88 |
// seen.add(id); |
|
89 |
// queue.add(doc); |
|
90 |
// } |
|
91 |
// |
|
92 |
// } else { |
|
93 |
// if (!logged) { |
|
94 |
// // context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1); |
|
95 |
// context.getCounter("ngram size > " + queueMaxSize, "N").increment(1); |
|
96 |
// log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'"); |
|
97 |
// logged = true; |
|
98 |
// } |
|
99 |
// } |
|
100 |
// } |
|
101 |
// |
|
102 |
// log.info(String.format("cluster key '%s' size '%s'", key, count)); |
|
103 |
// |
|
104 |
// return queue; |
|
105 |
// } |
|
106 |
// |
|
107 |
// private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) { |
|
108 |
// final Queue<MapDocument> q = new LinkedList<MapDocument>(); |
|
109 |
// |
|
110 |
// String fieldRef = ""; |
|
111 |
// final List<MapDocument> tempResults = Lists.newArrayList(); |
|
112 |
// |
|
113 |
// while (!queue.isEmpty()) { |
|
114 |
// final MapDocument result = queue.remove(); |
|
115 |
// |
|
116 |
// final String orderFieldName = dedupConf.getWf().getOrderField(); |
|
117 |
// final Field orderFieldValue = result.values(orderFieldName); |
|
118 |
// if (!orderFieldValue.isEmpty()) { |
|
119 |
// final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue()); |
|
120 |
// if (field.equals(fieldRef)) { |
|
121 |
// tempResults.add(result); |
|
122 |
// } else { |
|
123 |
// populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram); |
|
124 |
// tempResults.clear(); |
|
125 |
// tempResults.add(result); |
|
126 |
// fieldRef = field; |
|
127 |
// } |
|
128 |
// } else { |
|
129 |
// context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1); |
|
130 |
// } |
|
131 |
// } |
|
132 |
// populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram); |
|
133 |
// |
|
134 |
// return q; |
|
135 |
// } |
|
136 |
// |
|
137 |
// private void populateSimplifiedQueue(final Queue<MapDocument> q, |
|
138 |
// final List<MapDocument> tempResults, |
|
139 |
// final Context context, |
|
140 |
// final String fieldRef, |
|
141 |
// final String ngram) { |
|
142 |
// if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) { |
|
143 |
// q.addAll(tempResults); |
|
144 |
// } else { |
|
145 |
// context.getCounter(dedupConf.getWf().getEntityType(), |
|
146 |
// "Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize()) |
|
147 |
// .increment(tempResults.size()); |
|
148 |
// log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram); |
|
149 |
// } |
|
150 |
// } |
|
151 |
// |
|
152 |
// private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException { |
|
153 |
// |
|
154 |
// final PaceDocumentDistance algo = new PaceDocumentDistance(); |
|
155 |
// |
|
156 |
// while (!queue.isEmpty()) { |
|
157 |
// |
|
158 |
// final MapDocument pivot = queue.remove(); |
|
159 |
// final String idPivot = pivot.getIdentifier(); |
|
160 |
// |
|
161 |
// final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField()); |
|
162 |
// final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue(); |
|
163 |
// |
|
164 |
// if (fieldPivot != null) { |
|
165 |
// // System.out.println(idPivot + " --> " + fieldPivot); |
|
166 |
// |
|
167 |
// int i = 0; |
|
168 |
// for (final MapDocument curr : queue) { |
|
169 |
// final String idCurr = curr.getIdentifier(); |
|
170 |
// |
|
171 |
// if (mustSkip(idCurr)) { |
|
172 |
// context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1); |
|
173 |
// break; |
|
174 |
// } |
|
175 |
// |
|
176 |
// if (i > dedupConf.getWf().getSlidingWindowSize()) { |
|
177 |
// break; |
|
178 |
// } |
|
179 |
// |
|
180 |
// final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField()); |
|
181 |
// final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue(); |
|
182 |
// |
|
183 |
// if (!idCurr.equals(idPivot) && (fieldCurr != null)) { |
|
184 |
// |
|
185 |
// final ScoreResult sr = similarity(algo, pivot, curr); |
|
186 |
// emitOutput(sr, idPivot, idCurr, context); |
|
187 |
// i++; |
|
188 |
// } |
|
189 |
// } |
|
190 |
// } |
|
191 |
// } |
|
192 |
// } |
|
193 |
// |
|
194 |
// private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException { |
|
195 |
// final double d = sr.getScore(); |
|
196 |
// |
|
197 |
// if (d >= dedupConf.getWf().getThreshold()) { |
|
198 |
// writeSimilarity(context, idPivot, idCurr); |
|
199 |
// context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1); |
|
200 |
// } else { |
|
201 |
// context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1); |
|
202 |
// } |
|
203 |
// } |
|
204 |
// |
|
205 |
// private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) { |
|
206 |
// try { |
|
207 |
// return algo.between(a, b, dedupConf); |
|
208 |
// } catch(Throwable e) { |
|
209 |
// log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e); |
|
210 |
// throw new IllegalArgumentException(e); |
|
211 |
// } |
|
212 |
// } |
|
213 |
// |
|
214 |
// private boolean mustSkip(final String idPivot) { |
|
215 |
// return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot)); |
|
216 |
// } |
|
217 |
// |
|
218 |
// private String getNsPrefix(final String id) { |
|
219 |
// return StringUtils.substringBetween(id, "|", "::"); |
|
220 |
// } |
|
221 |
// |
|
222 |
// private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException { |
|
223 |
// final byte[] rowKey = Bytes.toBytes(idPivot); |
|
224 |
// final byte[] target = Bytes.toBytes(id); |
|
225 |
// |
|
226 |
// //log.info("writing similarity: " + idPivot + " <-> " + id); |
|
227 |
// |
|
228 |
// emitRel(context, rowKey, target); |
|
229 |
// emitRel(context, target, rowKey); |
|
230 |
// } |
|
231 |
// |
|
232 |
// private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException { |
|
233 |
// final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getWf().getEntityType())), to, Bytes.toBytes("")); |
|
234 |
// put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
235 |
// ibw.set(from); |
|
236 |
// context.write(ibw, put); |
|
237 |
// } |
|
238 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.*; |
|
5 |
|
|
6 |
import com.google.common.collect.Lists; |
|
7 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
8 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
9 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
10 |
import eu.dnetlib.pace.clustering.NGramUtils; |
|
11 |
import eu.dnetlib.pace.config.DedupConfig; |
|
12 |
import eu.dnetlib.pace.distance.PaceDocumentDistance; |
|
13 |
import eu.dnetlib.pace.distance.eval.ScoreResult; |
|
14 |
import eu.dnetlib.pace.model.Field; |
|
15 |
import eu.dnetlib.pace.model.MapDocument; |
|
16 |
import eu.dnetlib.pace.model.MapDocumentComparator; |
|
17 |
import eu.dnetlib.pace.model.MapDocumentSerializer; |
|
18 |
import org.apache.commons.lang3.StringUtils; |
|
19 |
import org.apache.commons.logging.Log; |
|
20 |
import org.apache.commons.logging.LogFactory; |
|
21 |
import org.apache.hadoop.hbase.client.Put; |
|
22 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
23 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
24 |
import org.apache.hadoop.hbase.util.Bytes; |
|
25 |
import org.apache.hadoop.io.Text; |
|
26 |
|
|
27 |
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> { |
|
28 |
|
|
29 |
private static final Log log = LogFactory.getLog(DedupReducer.class); |
|
30 |
|
|
31 |
private DedupConfig dedupConf; |
|
32 |
|
|
33 |
private ImmutableBytesWritable ibw; |
|
34 |
|
|
35 |
@Override |
|
36 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
37 |
|
|
38 |
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF)); |
|
39 |
ibw = new ImmutableBytesWritable(); |
|
40 |
|
|
41 |
log.info("dedup reduce phase \npace conf: " + dedupConf.toString()); |
|
42 |
} |
|
43 |
|
|
44 |
@Override |
|
45 |
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException { |
|
46 |
|
|
47 |
final Queue<MapDocument> q = prepare(context, key, values); |
|
48 |
|
|
49 |
if (q.size() > 1) { |
|
50 |
log.info("reducing key: '" + key + "' records: " + q.size()); |
|
51 |
|
|
52 |
switch (Type.valueOf(dedupConf.getWf().getEntityType())) { |
|
53 |
case person: |
|
54 |
process(q, context); |
|
55 |
break; |
|
56 |
case publication: |
|
57 |
case dataset: |
|
58 |
process(simplifyQueue(q, key.toString(), context), context); |
|
59 |
break; |
|
60 |
case organization: |
|
61 |
process(q, context); |
|
62 |
break; |
|
63 |
default: |
|
64 |
throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType()); |
|
65 |
} |
|
66 |
} else { |
|
67 |
context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1); |
|
68 |
} |
|
69 |
} |
|
70 |
|
|
71 |
private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) { |
|
72 |
final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField())); |
|
73 |
|
|
74 |
final Set<String> seen = new HashSet<String>(); |
|
75 |
final int queueMaxSize = dedupConf.getWf().getQueueMaxSize(); |
|
76 |
int count = 0; |
|
77 |
boolean logged = false; |
|
78 |
|
|
79 |
for (final ImmutableBytesWritable i : values) { |
|
80 |
count++; |
|
81 |
|
|
82 |
if (queue.size() <= queueMaxSize) { |
|
83 |
final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes()); |
|
84 |
final String id = doc.getIdentifier(); |
|
85 |
|
|
86 |
if (!seen.contains(id)) { |
|
87 |
seen.add(id); |
|
88 |
queue.add(doc); |
|
89 |
} |
|
90 |
|
|
91 |
} else { |
|
92 |
if (!logged) { |
|
93 |
// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1); |
|
94 |
context.getCounter("ngram size > " + queueMaxSize, "N").increment(1); |
|
95 |
log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'"); |
|
96 |
logged = true; |
|
97 |
} |
|
98 |
} |
|
99 |
} |
|
100 |
|
|
101 |
log.info(String.format("cluster key '%s' size '%s'", key, count)); |
|
102 |
|
|
103 |
return queue; |
|
104 |
} |
|
105 |
|
|
106 |
private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) { |
|
107 |
final Queue<MapDocument> q = new LinkedList<MapDocument>(); |
|
108 |
|
|
109 |
String fieldRef = ""; |
|
110 |
final List<MapDocument> tempResults = Lists.newArrayList(); |
|
111 |
|
|
112 |
while (!queue.isEmpty()) { |
|
113 |
final MapDocument result = queue.remove(); |
|
114 |
|
|
115 |
final String orderFieldName = dedupConf.getWf().getOrderField(); |
|
116 |
final Field orderFieldValue = result.values(orderFieldName); |
|
117 |
if (!orderFieldValue.isEmpty()) { |
|
118 |
final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue()); |
|
119 |
if (field.equals(fieldRef)) { |
|
120 |
tempResults.add(result); |
|
121 |
} else { |
|
122 |
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram); |
|
123 |
tempResults.clear(); |
|
124 |
tempResults.add(result); |
|
125 |
fieldRef = field; |
|
126 |
} |
|
127 |
} else { |
|
128 |
context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1); |
|
129 |
} |
|
130 |
} |
|
131 |
populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram); |
|
132 |
|
|
133 |
return q; |
|
134 |
} |
|
135 |
|
|
136 |
private void populateSimplifiedQueue(final Queue<MapDocument> q, |
|
137 |
final List<MapDocument> tempResults, |
|
138 |
final Context context, |
|
139 |
final String fieldRef, |
|
140 |
final String ngram) { |
|
141 |
if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) { |
|
142 |
q.addAll(tempResults); |
|
143 |
} else { |
|
144 |
context.getCounter(dedupConf.getWf().getEntityType(), |
|
145 |
"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize()) |
|
146 |
.increment(tempResults.size()); |
|
147 |
log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram); |
|
148 |
} |
|
149 |
} |
|
150 |
|
|
151 |
private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException { |
|
152 |
|
|
153 |
final PaceDocumentDistance algo = new PaceDocumentDistance(); |
|
154 |
|
|
155 |
while (!queue.isEmpty()) { |
|
156 |
|
|
157 |
final MapDocument pivot = queue.remove(); |
|
158 |
final String idPivot = pivot.getIdentifier(); |
|
159 |
|
|
160 |
final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField()); |
|
161 |
final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue(); |
|
162 |
|
|
163 |
if (fieldPivot != null) { |
|
164 |
// System.out.println(idPivot + " --> " + fieldPivot); |
|
165 |
|
|
166 |
int i = 0; |
|
167 |
for (final MapDocument curr : queue) { |
|
168 |
final String idCurr = curr.getIdentifier(); |
|
169 |
|
|
170 |
if (mustSkip(idCurr)) { |
|
171 |
context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1); |
|
172 |
break; |
|
173 |
} |
|
174 |
|
|
175 |
if (i > dedupConf.getWf().getSlidingWindowSize()) { |
|
176 |
break; |
|
177 |
} |
|
178 |
|
|
179 |
final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField()); |
|
180 |
final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue(); |
|
181 |
|
|
182 |
if (!idCurr.equals(idPivot) && (fieldCurr != null)) { |
|
183 |
|
|
184 |
final ScoreResult sr = similarity(algo, pivot, curr); |
|
185 |
emitOutput(sr, idPivot, idCurr, context); |
|
186 |
i++; |
|
187 |
} |
|
188 |
} |
|
189 |
} |
|
190 |
} |
|
191 |
} |
|
192 |
|
|
193 |
private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException { |
|
194 |
final double d = sr.getScore(); |
|
195 |
|
|
196 |
if (d >= dedupConf.getWf().getThreshold()) { |
|
197 |
writeSimilarity(context, idPivot, idCurr); |
|
198 |
context.getCounter(dedupConf.getWf().getEntityType(), "dedup similarity (x2)").increment(1); |
|
199 |
} else { |
|
200 |
context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1); |
|
201 |
} |
|
202 |
} |
|
203 |
|
|
204 |
private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) { |
|
205 |
try { |
|
206 |
return algo.between(a, b, dedupConf); |
|
207 |
} catch(Throwable e) { |
|
208 |
log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e); |
|
209 |
throw new IllegalArgumentException(e); |
|
210 |
} |
|
211 |
} |
|
212 |
|
|
213 |
private boolean mustSkip(final String idPivot) { |
|
214 |
return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot)); |
|
215 |
} |
|
216 |
|
|
217 |
private String getNsPrefix(final String id) { |
|
218 |
return StringUtils.substringBetween(id, "|", "::"); |
|
219 |
} |
|
220 |
|
|
221 |
private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException { |
|
222 |
final byte[] rowKey = Bytes.toBytes(idPivot); |
|
223 |
final byte[] target = Bytes.toBytes(id); |
|
224 |
|
|
225 |
//log.info("writing similarity: " + idPivot + " <-> " + id); |
|
226 |
|
|
227 |
emitRel(context, rowKey, target); |
|
228 |
emitRel(context, target, rowKey); |
|
229 |
} |
|
230 |
|
|
231 |
private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException { |
|
232 |
final byte[] qualifier = HBaseTableDAO.getSimilarityQualifierBytes(Type.valueOf(dedupConf.getWf().getEntityType()), new String(to)); |
|
233 |
final Put put = new Put(from).add(HBaseTableDAO.cfRelsByte(), qualifier, Bytes.toBytes("")); |
|
234 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
235 |
ibw.set(from); |
|
236 |
context.write(ibw, put); |
|
237 |
} |
|
238 |
} |
Also available in: Unified diff
work in progress, adapting m/r jobs to the updated graph domain version