Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

    
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5

    
6
import eu.dnetlib.data.mapreduce.JobParams;
7
import eu.dnetlib.data.mapreduce.util.DedupUtils;
8
import eu.dnetlib.data.mapreduce.util.StreamUtils;
9
import eu.dnetlib.data.proto.DedupProtos.Dedup;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.TypeProtos.Type;
13
import eu.dnetlib.pace.config.DedupConfig;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.apache.hadoop.hbase.client.Durability;
17
import org.apache.hadoop.hbase.client.Put;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableReducer;
20
import org.apache.hadoop.hbase.util.Bytes;
21
import org.apache.hadoop.io.Text;
22

    
23
/**
24
 * Created by claudio on 15/10/15.
25
 */
26
public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> {
27

    
28
	private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class);
29

    
30
	private DedupConfig dedupConf;
31

    
32
	private byte[] cfMergedIn;
33

    
34
	private byte[] cfMerges;
35

    
36
	@Override
37
	protected void setup(final Context context) {
38

    
39
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
40
		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
41

    
42
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
43
		cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type);
44
		cfMerges = DedupUtils.getDedupCF_mergesBytes(type);
45
	}
46

    
47
	@Override
48
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
49

    
50
		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
51

    
52
		StreamUtils.toStream(values.iterator())
53
		//		.limit(1000)  // might cause timeouts in case of large number of items 
54
				.flatMap(v -> v.getEdges().stream())
55
				.forEach(q -> {
56
					final byte[] qb = Bytes.toBytes(q.toString());
57
					emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
58
					emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
59

    
60
					context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
61

    
62
				});
63
	}
64

    
65
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) {
66
		final Put put = new Put(from).add(cf, to, value);
67
		put.setDurability(Durability.SKIP_WAL);
68
		try {
69
			context.write(new ImmutableBytesWritable(from), put);
70
		} catch (IOException | InterruptedException e) {
71
			throw new RuntimeException(e);
72
		}
73
	}
74

    
75
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
76

    
77
		final OafRel.Builder oafRef = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
78
		final Oaf oaf = DedupUtils.buildRel(dedupConf, oafRef, 0.8).build();
79

    
80
		return oaf.toByteArray();
81
	}
82

    
83
}
(2-2/6)