Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.index;
2

    
3
import java.io.IOException;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.fs.Path;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Job;
9
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
11
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12

    
13
import eu.dnetlib.data.mapreduce.JobParams;
14
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
15

    
16
public class DocumentDatabaseFeedJob extends AbstractHBaseMapReduceJob {
17

    
18
	@Override
19
	public Job setJobDetails(final Job job, final Properties p) {
20

    
21
		job.setInputFormatClass(SequenceFileInputFormat.class);
22
		try {
23
			FileInputFormat.setInputPaths(job, p.getProperty(JobParams.MAPRED_INPUT_DIR));
24
			FileOutputFormat.setOutputPath(job, new Path(p.getProperty(JobParams.DOCUMENT_DB_ROTTEN_FILE)));
25
		} catch (IOException e) {
26
			throw new RuntimeException(e);
27
		}
28
		job.setMapperClass(DocumentDatabaseMapper.class);
29
		job.setMapOutputKeyClass(Text.class);
30
		job.setMapOutputValueClass(Text.class);
31

    
32
		job.setNumReduceTasks(0);
33

    
34
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
35
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
36

    
37
		job.getConfiguration().setBoolean("mapred.compress.map.output", true);
38

    
39
		return job;
40
	}
41
}
(2-2/7)