Project

General

Profile

« Previous | Next » 

Revision 52984

do not push vertex ids in memory, process them on the fly

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java
2 2

  
3 3
import java.io.IOException;
4 4
import java.nio.ByteBuffer;
5
import java.util.Set;
6 5

  
7
import com.google.common.collect.Sets;
8 6
import eu.dnetlib.data.mapreduce.JobParams;
9 7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.mapreduce.util.StreamUtils;
10 9
import eu.dnetlib.data.proto.DedupProtos.Dedup;
11
import eu.dnetlib.data.proto.KindProtos.Kind;
12 10
import eu.dnetlib.data.proto.OafProtos.Oaf;
13 11
import eu.dnetlib.data.proto.OafProtos.OafRel;
14
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder;
15 12
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
17 13
import eu.dnetlib.pace.config.DedupConfig;
18 14
import org.apache.commons.logging.Log;
19 15
import org.apache.commons.logging.LogFactory;
16
import org.apache.hadoop.hbase.client.Durability;
20 17
import org.apache.hadoop.hbase.client.Put;
21 18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22 19
import org.apache.hadoop.hbase.mapreduce.TableReducer;
......
50 47
	@Override
51 48
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
52 49

  
53
		final Set<String> set = Sets.newHashSet();
54

  
55
		for(VertexWritable v : values) {
56
			for(Text t : v.getEdges()) {
57
				set.add(t.toString());
58
			}
59
		}
60

  
61 50
		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
62 51

  
63
		for(String q : set) {
64
			final byte[] qb = Bytes.toBytes(q);
65
			emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
66
			emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
52
		StreamUtils.toStream(values.iterator())
53
		//		.limit(1000)  // might cause timeouts in case of large number of items 
54
				.flatMap(v -> v.getEdges().stream())
55
				.forEach(q -> {
56
					final byte[] qb = Bytes.toBytes(q.toString());
57
					emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
58
					emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
67 59

  
68
			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
69
		}
60
					context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
70 61

  
62
				});
71 63
	}
72 64

  
73
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
74
			InterruptedException {
65
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) {
75 66
		final Put put = new Put(from).add(cf, to, value);
76
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
77
		context.write(new ImmutableBytesWritable(from), put);
67
		put.setDurability(Durability.SKIP_WAL);
68
		try {
69
			context.write(new ImmutableBytesWritable(from), put);
70
		} catch (IOException | InterruptedException e) {
71
			throw new RuntimeException(e);
72
		}
78 73
	}
79 74

  
80 75
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {

Also available in: Unified diff