Project

General

Profile

1
package eu.dnetlib.data.hadoop.action;
2

    
3
import java.io.IOException;
4
import java.io.StringWriter;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

    
8
import eu.dnetlib.data.hadoop.HadoopJob;
9
import eu.dnetlib.data.hadoop.config.ClusterName;
10
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
11
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
12
import eu.dnetlib.data.hadoop.utils.JobProfile;
13
import eu.dnetlib.data.hadoop.utils.ScanFactory;
14
import eu.dnetlib.data.hadoop.utils.ScanProperties;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.conf.Configuration;
20
import org.apache.hadoop.fs.Path;
21
import org.apache.hadoop.mapred.JobClient;
22
import org.apache.hadoop.mapred.JobConf;
23
import org.apache.hadoop.mapred.RunningJob;
24

    
25
public class SubmitMapreduceJobAction extends AbstractSubmitAction {
26

    
27
	/**
28
	 * logger.
29
	 */
30
	private static final Log log = LogFactory.getLog(SubmitMapreduceJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
31

    
32
	@Override
33
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
34
			throws HadoopServiceException {
35

    
36
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
37

    
38
		try {
39
			final JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
40

    
41
			logJobDetails(jobConf);
42

    
43
			final JobClient jtClient = hadoopClientMap.getJtClient(clusterName);
44
			final RunningJob runningJob = jtClient.submitJob(jobConf);
45

    
46
			final String jobId = newJobId(clusterName, runningJob.getID().getId());
47

    
48
			jobRegistry.registerJob(
49
					HadoopJob.newInstance(jobId, clusterName, jobProfile, new MapreduceJobMonitor(jtClient, runningJob, callback)));
50

    
51
		} catch (final IOException e) {
52
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
53
		}
54
	}
55

    
56
	protected JobConf prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters)
57
			throws IOException, HadoopServiceException {
58

    
59
		log.info("creating job: " + jobName);
60

    
61
		final JobConf jobConf = new JobConf(configuration);
62
		jobConf.setJobName(jobName);
63
		jobConf.set("dnet.mapred.job.description", jobProfile.getDescription());
64

    
65
		final String jobLib = getJobLib(configuration, jobProfile);
66
		jobConf.setJar(new Path(jobLib).toString());
67

    
68
		set(jobConf, jobProfile.getJobDefinition());
69
		set(jobConf, parameters);
70

    
71
		// if we're reading from hbase table then we need also to set a scanner.
72
		final ScanProperties scanProperties = jobProfile.getScanProperties();
73
		if (jobProfile.getRequiredParams().contains("hbase.mapreduce.inputtable") && (scanProperties != null)) {
74
			jobConf.set("hbase.mapreduce.scan", ScanFactory.getScan(scanProperties));
75
		}
76

    
77
		return jobConf;
78
	}
79

    
80
	protected String getJobLib(final Configuration configuration, final JobProfile jobProfile) throws HadoopServiceException {
81
		String jobLib = getDefaultLibPath(configuration.get("fs.defaultFS"));
82

    
83
		if (jobProfile.getJobDefinition().containsKey("job.lib")) {
84
			jobLib = jobProfile.getJobDefinition().get("job.lib");
85
		}
86

    
87
		if ((jobLib == null) || jobLib.isEmpty()) throw new HadoopServiceException("job.lib must refer to an absolute or relative HDFS path");
88
		if (!jobLib.startsWith("hdfs://")) {
89
			jobLib = configuration.get("fs.defaultFS") + jobLib;
90
		}
91

    
92
		log.info("using job.lib: " + jobLib);
93
		return jobLib;
94
	}
95

    
96
	protected void set(final JobConf jobConf, final Map<String, String> properties) {
97
		for (final Entry<String, String> e : properties.entrySet()) {
98
			if (checkHdfsProperty(e)) {
99
				final String v = jobConf.get("fs.defaultFS") + e.getValue();
100
				e.setValue(v);
101
			}
102
			jobConf.set(e.getKey(), e.getValue());
103
		}
104
	}
105

    
106
	protected void logJobDetails(final JobConf jobConf) {
107
		final StringWriter sw = new StringWriter();
108
		try {
109
			jobConf.writeXml(sw);
110
			if (log.isDebugEnabled()) {
111
				log.debug("\n" + IndentXmlString.apply(sw.toString()));
112
			}
113
		} catch (final IOException e) {
114
			log.warn("unable to log job details: " + jobConf.getJobName());
115
		}
116
	}
117

    
118
}
(12-12/13)