Project

General

Profile

1
//package eu.dnetlib.data.mapreduce.hbase.dedup.cc;
2
//
3
//import java.io.IOException;
4
//import java.nio.ByteBuffer;
5
//import java.util.Set;
6
//
7
//import com.google.common.collect.Sets;
8
//import eu.dnetlib.data.mapreduce.JobParams;
9
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
//import eu.dnetlib.data.proto.DNGFProtos.DNGF;
11
//import eu.dnetlib.data.proto.DNGFProtos.DNGFRel;
12
//import eu.dnetlib.data.proto.KindProtos.Kind;
13
//import eu.dnetlib.data.proto.SubRelProtos.Dedup;
14
//import eu.dnetlib.data.proto.TypeProtos.Type;
15
//import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
16
//import eu.dnetlib.pace.config.DedupConfig;
17
//import org.apache.commons.logging.Log;
18
//import org.apache.commons.logging.LogFactory;
19
//import org.apache.hadoop.hbase.client.Put;
20
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
//import org.apache.hadoop.hbase.mapreduce.TableReducer;
22
//import org.apache.hadoop.hbase.util.Bytes;
23
//import org.apache.hadoop.io.Text;
24
//
25
///**
26
// * Created by claudio on 15/10/15.
27
// */
28
//public class ConnectedComponentsReducer extends TableReducer<Text, VertexWritable, ImmutableBytesWritable> {
29
//
30
//	private static final Log log = LogFactory.getLog(ConnectedComponentsReducer.class);
31
//
32
//	private DedupConfig dedupConf;
33
//
34
//	private byte[] cfMergedIn;
35
//
36
//	private byte[] cfMerges;
37
//
38
//	@Override
39
//	protected void setup(final Context context) {
40
//
41
//		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
42
//		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
43
//
44
//		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
45
//		cfMergedIn = DedupUtils.getDedupCF_mergedInBytes(type);
46
//		cfMerges = DedupUtils.getDedupCF_mergesBytes(type);
47
//	}
48
//
49
//	@Override
50
//	protected void reduce(Text key, Iterable<VertexWritable> values, Context context) throws IOException, InterruptedException {
51
//
52
//		final Set<String> set = Sets.newHashSet();
53
//
54
//		for(VertexWritable v : values) {
55
//			for(Text t : v.getEdges()) {
56
//				set.add(t.toString());
57
//			}
58
//		}
59
//
60
//		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
61
//
62
//		for(String q : set) {
63
//			final byte[] qb = Bytes.toBytes(q);
64
//			emitDedupRel(context, cfMergedIn, qb, root, buildRel(qb, root, Dedup.RelName.isMergedIn));
65
//			emitDedupRel(context, cfMerges, root, qb, buildRel(root, qb, Dedup.RelName.merges));
66
//
67
//			context.getCounter(dedupConf.getWf().getEntityType(), "dedupRel (x2)").increment(1);
68
//		}
69
//
70
//	}
71
//
72
//	private void emitDedupRel(final Context context, final byte[] cf, final byte[] from, final byte[] to, final byte[] value) throws IOException,
73
//			InterruptedException {
74
//		final Put put = new Put(from).add(cf, to, value);
75
//		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
76
//		context.write(new ImmutableBytesWritable(from), put);
77
//	}
78
//
79
//	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
80
//		final DNGFRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
81
//		final DNGF oaf =
82
//				DNGF.newBuilder()
83
//						.setKind(Kind.relation)
84
//						.setLastupdatetimestamp(System.currentTimeMillis())
85
//						.setDataInfo(
86
//								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
87
//										dedupConf.getWf().getConfigurationId())).setRel(oafRel)
88
//						.build();
89
//		return oaf.toByteArray();
90
//	}
91
//
92
//}
(2-2/6)