Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.hbase.client.Scan;
7
import org.apache.hadoop.hbase.filter.PrefixFilter;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Job;
13

    
14
import eu.dnetlib.data.mapreduce.JobParams;
15
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
16
import eu.dnetlib.data.proto.TypeProtos.Type;
17
import eu.dnetlib.pace.util.DedupConfig;
18
import eu.dnetlib.pace.util.DedupConfigLoader;
19

    
20
public class DedupBuildRootsJob extends AbstractHBaseMapReduceJob {
21

    
22
	@Override
23
	public Job setJobDetails(final Job job, final Properties p) {
24

    
25
		initMapper(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE));
26

    
27
		job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
28
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
29

    
30
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
31
		job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
32

    
33
		job.setNumReduceTasks(500);
34

    
35
		return job;
36
	}
37

    
38
	private Scan getScan(final Properties p) {
39
		Scan scan = new Scan();
40
		scan.setCaching(100); // 1 is the default in Scan, which will be bad for MapReduce jobs
41

    
42
		scan.setCacheBlocks(false); // don't set to true for MR jobs
43

    
44
		DedupConfig dedupConfig = DedupConfigLoader.load(p.getProperty("dedup.wf.conf"));
45

    
46
		scan.setFilter(new PrefixFilter(Bytes.toBytes(Type.valueOf(dedupConfig.getEntityType()).getNumber())));
47

    
48
		return scan;
49
	}
50

    
51
	private void initMapper(final Job job, final Scan scan, final String sourceTable) {
52
		try {
53
			TableMapReduceUtil.initTableMapperJob(sourceTable, scan, DedupBuildRootsMapper.class, Text.class, ImmutableBytesWritable.class, job);
54
			TableMapReduceUtil.initTableReducerJob(sourceTable, DedupBuildRootsReducer.class, job);
55
		} catch (IOException e) {
56
			throw new RuntimeException(e);
57
		}
58
	}
59

    
60
}
(1-1/23)