Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.hbase.client.Scan;
7
import org.apache.hadoop.hbase.filter.PrefixFilter;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Job;
14

    
15
import eu.dnetlib.data.mapreduce.JobParams;
16
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
17
import eu.dnetlib.data.proto.TypeProtos.Type;
18
import eu.dnetlib.pace.util.DedupConfig;
19
import eu.dnetlib.pace.util.DedupConfigLoader;
20

    
21
public class DedupPersonJob extends AbstractHBaseMapReduceJob {
22

    
23
	@Override
24
	public Job setJobDetails(final Job job, final Properties p) {
25

    
26
		initJob(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE));
27

    
28
		job.setOutputFormatClass(TableOutputFormat.class);
29

    
30
		job.setNumReduceTasks(1000);
31
		job.getConfiguration().setBoolean("mapred.compress.map.output", true);
32

    
33
		job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
34
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
35

    
36
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
37
		job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
38

    
39
		// TODO under test
40
		// job.setCombinerClass(DedupPersonReducer.class);
41

    
42
		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, p.getProperty(JobParams.HBASE_TARGET_TABLE));
43

    
44
		return job;
45
	}
46

    
47
	private Scan getScan(final Properties p) {
48
		Scan scan = new Scan();
49
		scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
50
		scan.setCacheBlocks(false); // don't set to true for MR jobs
51

    
52
		DedupConfig dedupConfig = DedupConfigLoader.load(p.getProperty("dedup.wf.conf"));
53

    
54
		Type type = Type.valueOf(dedupConfig.getEntityType());
55
		scan.setFilter(new PrefixFilter(Bytes.toBytes(type.getNumber())));
56
		// scan.addFamily(dedupConfig.getEntityNameBytes());
57

    
58
		return scan;
59
	}
60

    
61
	private void initJob(final Job job, final Scan scan, final String sourceTable) {
62
		try {
63
			TableMapReduceUtil.initTableMapperJob(sourceTable, scan, DedupPersonMapper.class, Text.class, ImmutableBytesWritable.class, job);
64
			TableMapReduceUtil.initTableReducerJob(sourceTable, DedupPersonReducer.class, job);
65
		} catch (IOException e) {
66
			throw new RuntimeException(e);
67
		}
68
	}
69

    
70
}
(11-11/23)