Project

General

Profile

« Previous | Next » 

Revision 53025

simplified connected component application on the graph

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java
45 45
	}
46 46

  
47 47
	@Override
48
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
48
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) {
49 49

  
50 50
		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
51

  
52
		StreamUtils.toStream(values.iterator())
53
		//		.limit(1000)  // might cause timeouts in case of large number of items
54
				.flatMap(v -> v.getEdges().stream())
55
				.forEach(q -> {
56
					final byte[] qb = Bytes.toBytes(q.toString());
51
		values.forEach(vertex -> {
52
					final byte[] qb = Bytes.toBytes(vertex.getVertexId().toString());
57 53
					emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
58 54
					emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
59 55

  

Also available in: Unified diff