Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2

    
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5
import java.util.Set;
6

    
7
import com.google.common.collect.Sets;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
import eu.dnetlib.data.proto.DedupProtos.Dedup;
11
import eu.dnetlib.data.proto.KindProtos.Kind;
12
import eu.dnetlib.data.proto.OafProtos.Oaf;
13
import eu.dnetlib.data.proto.OafProtos.OafRel;
14
import eu.dnetlib.data.proto.OafProtos.OafRel.Builder;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
17
import eu.dnetlib.pace.config.DedupConfig;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.apache.hadoop.hbase.client.Put;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.util.Bytes;
24
import org.apache.hadoop.io.Text;
25

    
26
/**
27
 * Created by claudio on 15/10/15.
28
 */
29
public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> {
30

    
31
	private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class);
32

    
33
	private DedupConfig dedupConf;
34

    
35
	private byte[] cfMergedIn;
36

    
37
	private byte[] cfMerges;
38

    
39
	@Override
40
	protected void setup(final Context context) {
41

    
42
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
43
		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
44

    
45
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
46
		cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type);
47
		cfMerges = DedupUtils.getDedupCF_mergesBytes(type);
48
	}
49

    
50
	@Override
51
	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
52

    
53
		final Set<String> set = Sets.newHashSet();
54

    
55
		for(VertexWritable v : values) {
56
			for(Text t : v.getEdges()) {
57
				set.add(t.toString());
58
			}
59
		}
60

    
61
		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
62

    
63
		for(String q : set) {
64
			final byte[] qb = Bytes.toBytes(q);
65
			emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
66
			emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
67

    
68
			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
69
		}
70

    
71
	}
72

    
73
	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
74
			InterruptedException {
75
		final Put put = new Put(from).add(cf, to, value);
76
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
77
		context.write(new ImmutableBytesWritable(from), put);
78
	}
79

    
80
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
81

    
82
		final OafRel.Builder oafRef = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
83
		final Oaf oaf = DedupUtils.buildRel(dedupConf, oafRef, 0.8).build();
84

    
85
		return oaf.toByteArray();
86
	}
87

    
88
}
(2-2/6)