Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import org.apache.hadoop.hbase.client.Delete;
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10
import org.apache.hadoop.io.Writable;
11

    
12
import eu.dnetlib.data.mapreduce.JobParams;
13
import eu.dnetlib.data.mapreduce.util.DedupUtils;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import eu.dnetlib.pace.config.DedupConfig;
16

    
17
public class DedupDeleteSimRelMapper extends TableMapper<ImmutableBytesWritable, Writable> {
18

    
19
	private DedupConfig dedupConf;
20

    
21
	private ImmutableBytesWritable outKey;
22

    
23
	@Override
24
	protected void setup(final Context context) throws IOException, InterruptedException {
25
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
26
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
27

    
28
		outKey = new ImmutableBytesWritable();
29
	}
30

    
31
	@Override
32
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
33
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
34

    
35
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
36

    
37
		final Map<byte[], byte[]> similarRels = value.getFamilyMap(DedupUtils.getSimilarityCFBytes(type));
38

    
39
		if ((similarRels != null) && !similarRels.isEmpty()) {
40

    
41
			final byte[] row = rowkey.copyBytes();
42
			final Delete delete = new Delete(row);
43

    
44
			for (final byte[] q : similarRels.keySet()) {
45
				delete.deleteColumns(DedupUtils.getSimilarityCFBytes(type), q);
46
			}
47

    
48
			outKey.set(row);
49
			context.write(outKey, delete);
50
			context.getCounter(dedupConf.getWf().getEntityType(), "similarity deleted").increment(similarRels.size());
51

    
52
		} else {
53
			context.getCounter(dedupConf.getWf().getEntityType(), "row not in similarity mesh").increment(1);
54
		}
55
	}
56
}
(4-4/22)