Revision 53025
Added by Claudio Atzori about 6 years ago
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java | ||
---|---|---|
45 | 45 |
} |
46 | 46 |
|
47 | 47 |
@Override |
48 |
protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
|
|
48 |
protected void reduce(Text key, Iterable<VertexWritable> values, Context context) { |
|
49 | 49 |
|
50 | 50 |
final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun()); |
51 |
|
|
52 |
StreamUtils.toStream(values.iterator()) |
|
53 |
// .limit(1000) // might cause timeouts in case of large number of items |
|
54 |
.flatMap(v -> v.getEdges().stream()) |
|
55 |
.forEach(q -> { |
|
56 |
final byte[] qb = Bytes.toBytes(q.toString()); |
|
51 |
values.forEach(vertex -> { |
|
52 |
final byte[] qb = Bytes.toBytes(vertex.getVertexId().toString()); |
|
57 | 53 |
emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn)); |
58 | 54 |
emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges)); |
59 | 55 |
|
Also available in: Unified diff
simplified connected component application on the graph