Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.List;
6

    
7
import org.apache.hadoop.hbase.KeyValue;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14

    
15
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16
import eu.dnetlib.data.proto.TypeProtos.Type;
17
import eu.dnetlib.pace.util.DedupConfig;
18
import eu.dnetlib.pace.util.DedupConfigLoader;
19

    
20
public class DedupGrouperMapper extends TableMapper<Text, Put> {
21

    
22
	private static final boolean WRITE_TO_WAL = false;
23

    
24
	public static final String COUNTER_GROUP = "dedup.grouper";
25

    
26
	public static final String COUNTER_NAME = "written.rels";
27

    
28
	private Text rowKey;
29

    
30
	private DedupConfig dedupConf;
31

    
32
	@Override
33
	protected void setup(final Context context) throws IOException, InterruptedException {
34
		rowKey = new Text();
35

    
36
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
37
	}
38

    
39
	@Override
40
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
41

    
42
		final List<KeyValue> kvList = value.list();
43
		//System.out.println("Grouper mapping " + kvList.size() + " rels for key: " + new String(keyIn.copyBytes()));
44

    
45
		for (KeyValue n : kvList) {
46
			for (KeyValue j : kvList) {
47

    
48
				byte[] nq = n.getQualifier();
49
				byte[] jq = j.getQualifier();
50

    
51
				if (!Arrays.equals(nq, jq)) {
52

    
53
					Put put = new Put(nq).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), jq, Bytes.toBytes(""));
54
					put.setWriteToWAL(WRITE_TO_WAL);
55
					rowKey.set(nq);
56
					context.write(rowKey, put);
57

    
58
					context.getCounter(COUNTER_GROUP, COUNTER_NAME).increment(1);
59
				}
60
			}
61
		}
62
	}
63

    
64
}
(7-7/23)