Revision 52984
Added by Claudio Atzori over 5 years ago
ConnectedComponentsReducer.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 | 4 |
import java.nio.ByteBuffer; |
5 |
import java.util.Set; |
|
6 | 5 |
|
7 |
import com.google.common.collect.Sets; |
|
8 | 6 |
import eu.dnetlib.data.mapreduce.JobParams; |
9 | 7 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
8 |
import eu.dnetlib.data.mapreduce.util.StreamUtils; |
|
10 | 9 |
import eu.dnetlib.data.proto.DedupProtos.Dedup; |
11 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
12 | 10 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
13 | 11 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
14 |
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder; |
|
15 | 12 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
16 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
|
17 | 13 |
import eu.dnetlib.pace.config.DedupConfig; |
18 | 14 |
import org.apache.commons.logging.Log; |
19 | 15 |
import org.apache.commons.logging.LogFactory; |
16 |
import org.apache.hadoop.hbase.client.Durability; |
|
20 | 17 |
import org.apache.hadoop.hbase.client.Put; |
21 | 18 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
22 | 19 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
... | ... | |
50 | 47 |
@Override |
51 | 48 |
protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException { |
52 | 49 |
|
53 |
final Set<String> set = Sets.newHashSet(); |
|
54 |
|
|
55 |
for(VertexWritable v : values) { |
|
56 |
for(Text t : v.getEdges()) { |
|
57 |
set.add(t.toString()); |
|
58 |
} |
|
59 |
} |
|
60 |
|
|
61 | 50 |
final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun()); |
62 | 51 |
|
63 |
for(String q : set) { |
|
64 |
final byte[] qb = Bytes.toBytes(q); |
|
65 |
emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn)); |
|
66 |
emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges)); |
|
52 |
StreamUtils.toStream(values.iterator()) |
|
53 |
// .limit(1000) // might cause timeouts in case of large number of items |
|
54 |
.flatMap(v -> v.getEdges().stream()) |
|
55 |
.forEach(q -> { |
|
56 |
final byte[] qb = Bytes.toBytes(q.toString()); |
|
57 |
emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn)); |
|
58 |
emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges)); |
|
67 | 59 |
|
68 |
context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
69 |
} |
|
60 |
context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1); |
|
70 | 61 |
|
62 |
}); |
|
71 | 63 |
} |
72 | 64 |
|
73 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException, |
|
74 |
InterruptedException { |
|
65 |
private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) { |
|
75 | 66 |
final Put put = new Put(from).add(cf, to, value); |
76 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
77 |
context.write(new ImmutableBytesWritable(from), put); |
|
67 |
put.setDurability(Durability.SKIP_WAL); |
|
68 |
try { |
|
69 |
context.write(new ImmutableBytesWritable(from), put); |
|
70 |
} catch (IOException | InterruptedException e) { |
|
71 |
throw new RuntimeException(e); |
|
72 |
} |
|
78 | 73 |
} |
79 | 74 |
|
80 | 75 |
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) { |
Also available in: Unified diff
do not push vertex ids in memory, process them on the fly