Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase;
2

    
3
import java.io.IOException;
4
import java.util.Map.Entry;
5
import java.util.Properties;
6

    
7
import javax.annotation.Resource;
8

    
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.conf.Configured;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.Path;
14
import org.apache.hadoop.mapreduce.Job;
15
import org.springframework.beans.factory.BeanNameAware;
16

    
17
import eu.dnetlib.data.hadoop.config.ClusterName;
18
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
19
import eu.dnetlib.data.mapreduce.HadoopJob;
20

    
21
public abstract class AbstractHBaseMapReduceJob extends Configured implements HadoopJob, BeanNameAware {
22

    
23
	protected static final Log log = LogFactory.getLog(AbstractHBaseMapReduceJob.class);
24

    
25
	protected String jobName;
26

    
27
	@Resource
28
	protected ConfigurationEnumerator configurationEnumerator;
29

    
30
	protected abstract Job setJobDetails(Job job, Properties p) throws Exception;
31

    
32
	@Override
33
	public Job setJobDetails(ClusterName name, Properties p) {
34
		try {
35
			final Job job = createJobCommon(name, p);
36
			return setJobDetails(job, p);
37
		} catch (Exception e) {
38
			throw new RuntimeException("unable to define Job: " + getClass().getSimpleName(), e);
39
		}
40
	}
41

    
42
	protected Job createJobCommon(ClusterName name, Properties p) throws IOException {
43

    
44
		Job job = new Job(configurationEnumerator.get(name), getClass().getSimpleName());
45

    
46
		merge(job, p);
47

    
48
		return job;
49
	}
50

    
51
	protected void merge(final Job job, final Properties p) {
52
		for (Entry<Object, Object> e : p.entrySet()) {
53
			job.getConfiguration().set((String) e.getKey(), (String) e.getValue());
54
		}
55
	}
56

    
57
	protected void deleteHdfsFile(Job job, Path path) {
58
		try {
59
			FileSystem hdfs = FileSystem.get(job.getConfiguration());
60
			if (hdfs.exists(path)) {
61
				hdfs.delete(path, true);
62
			}
63
		} catch (IOException e) {
64
			e.printStackTrace();
65
		}
66
	}
67

    
68
	@Override
69
	public void setBeanName(String name) {
70
		this.jobName = name;
71
	}
72

    
73
	@Override
74
	public String getName() {
75
		return jobName;
76
	}
77

    
78
}
(1-1/2)