Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.reset;
2

    
3
import java.io.IOException;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.hbase.client.Mutation;
7
import org.apache.hadoop.hbase.client.Scan;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
10
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
11
import org.apache.hadoop.io.Text;
12
import org.apache.hadoop.mapreduce.Job;
13

    
14
import eu.dnetlib.data.mapreduce.JobParams;
15
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
16

    
17
public class HBaseResetJob extends AbstractHBaseMapReduceJob {
18

    
19
	@Override
20
	protected Job setJobDetails(Job job, Properties p) throws Exception {
21

    
22
		initMapper(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE));
23

    
24
		job.setOutputFormatClass(TableOutputFormat.class);
25
		job.setMapOutputKeyClass(Text.class);
26
		job.setMapOutputValueClass(Mutation.class);
27
		job.setNumReduceTasks(0);
28

    
29
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
30
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
31

    
32
		return job;
33
	}
34

    
35
	private void initMapper(final Job job, final Scan scan, final String sourceTable) {
36
		try {
37
			TableMapReduceUtil.initTableMapperJob(sourceTable, scan, HBaseResetMapper.class, Text.class, ImmutableBytesWritable.class, job);
38
		} catch (IOException e) {
39
			throw new RuntimeException(e);
40
		}
41
	}
42

    
43
	private Scan getScan(Properties p) {
44
		Scan scan = new Scan();
45
		scan.setCaching(100); // 1 is the default in Scan, which will be bad for MapReduce jobs
46
		scan.setCacheBlocks(false); // don't set to true for MR jobs
47
		return scan;
48
	}
49

    
50
}
(1-1/2)