Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.hbase.client.Scan;
7
import org.apache.hadoop.hbase.filter.PrefixFilter;
8
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Job;
12

    
13
import eu.dnetlib.data.mapreduce.JobParams;
14
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16

    
17
public class FindDedupCandidatePersonsJob extends AbstractHBaseMapReduceJob {
18

    
19
	private static final String PERSON_SCAN_PREFIX = Type.person.getNumber() + "|";
20

    
21
	@Override
22
	public Job setJobDetails(Job job, Properties p) {
23
		p.setProperty("mapred.reduce.tasks.speculative.execution", "true");
24

    
25
		initMapper(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE));
26

    
27
		job.setNumReduceTasks(100);
28

    
29
		return job;
30
	}
31

    
32
	private Scan getScan(Properties p) {
33
		Scan scan = new Scan();
34
		scan.setCaching(100);
35
		scan.setCacheBlocks(false);
36
		scan.setFilter(new PrefixFilter(Bytes.toBytes(PERSON_SCAN_PREFIX)));
37
		return scan;
38
	}
39

    
40
	private void initMapper(final Job job, final Scan scan, final String sourceTable) {
41
		try {
42
			TableMapReduceUtil.initTableMapperJob(sourceTable, scan, FindDedupCandidatePersonsMapper.class, Text.class, Text.class, job);
43
			TableMapReduceUtil.initTableReducerJob(sourceTable, FindDedupCandidatePersonsReducer.class, job);
44
		} catch (IOException e) {
45
			throw new RuntimeException(e);
46
		}
47
	}
48

    
49
}
(17-17/23)