Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.hadoop.action;
2
3
import java.io.IOException;
4
import java.io.StringWriter;
5
import java.util.Map;
6
import java.util.Map.Entry;
7
8
import eu.dnetlib.data.hadoop.HadoopJob;
9
import eu.dnetlib.data.hadoop.config.ClusterName;
10
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
11
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
12
import eu.dnetlib.data.hadoop.utils.JobProfile;
13
import eu.dnetlib.data.hadoop.utils.ScanFactory;
14
import eu.dnetlib.data.hadoop.utils.ScanProperties;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
17 49795 claudio.at
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.conf.Configuration;
20
import org.apache.hadoop.fs.Path;
21
import org.apache.hadoop.mapred.JobClient;
22
import org.apache.hadoop.mapred.JobConf;
23
import org.apache.hadoop.mapred.RunningJob;
24 26600 sandro.lab
25
public class SubmitMapreduceJobAction extends AbstractSubmitAction {
26
27
	/**
28
	 * logger.
29
	 */
30
	private static final Log log = LogFactory.getLog(SubmitMapreduceJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
31
32
	@Override
33 34703 claudio.at
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
34
			throws HadoopServiceException {
35 26600 sandro.lab
36
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
37
38
		try {
39 34703 claudio.at
			final JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
40 26600 sandro.lab
41
			logJobDetails(jobConf);
42
43 49795 claudio.at
			final JobClient jtClient = hadoopClientMap.getJtClient(clusterName);
44
			final RunningJob runningJob = jtClient.submitJob(jobConf);
45
46 26600 sandro.lab
			final String jobId = newJobId(clusterName, runningJob.getID().getId());
47
48 49795 claudio.at
			jobRegistry.registerJob(
49
					HadoopJob.newInstance(jobId, clusterName, jobProfile, new MapreduceJobMonitor(jtClient, runningJob, callback)));
50 26600 sandro.lab
51 34703 claudio.at
		} catch (final IOException e) {
52 32818 claudio.at
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
53 26600 sandro.lab
		}
54
	}
55
56 31337 claudio.at
	protected JobConf prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters)
57 29516 claudio.at
			throws IOException, HadoopServiceException {
58 26600 sandro.lab
59
		log.info("creating job: " + jobName);
60
61 34703 claudio.at
		final JobConf jobConf = new JobConf(configuration);
62 26600 sandro.lab
		jobConf.setJobName(jobName);
63
		jobConf.set("dnet.mapred.job.description", jobProfile.getDescription());
64
65 34703 claudio.at
		final String jobLib = getJobLib(configuration, jobProfile);
66 26600 sandro.lab
		jobConf.setJar(new Path(jobLib).toString());
67
68
		set(jobConf, jobProfile.getJobDefinition());
69
		set(jobConf, parameters);
70
71
		// if we're reading from hbase table then we need also to set a scanner.
72 34703 claudio.at
		final ScanProperties scanProperties = jobProfile.getScanProperties();
73 29516 claudio.at
		if (jobProfile.getRequiredParams().contains("hbase.mapreduce.inputtable") && (scanProperties != null)) {
74 26600 sandro.lab
			jobConf.set("hbase.mapreduce.scan", ScanFactory.getScan(scanProperties));
75
		}
76
77
		return jobConf;
78
	}
79
80 31337 claudio.at
	protected String getJobLib(final Configuration configuration, final JobProfile jobProfile) throws HadoopServiceException {
81 26600 sandro.lab
		String jobLib = getDefaultLibPath(configuration.get("fs.defaultFS"));
82
83
		if (jobProfile.getJobDefinition().containsKey("job.lib")) {
84
			jobLib = jobProfile.getJobDefinition().get("job.lib");
85
		}
86
87 29516 claudio.at
		if ((jobLib == null) || jobLib.isEmpty()) throw new HadoopServiceException("job.lib must refer to an absolute or relative HDFS path");
88 26600 sandro.lab
		if (!jobLib.startsWith("hdfs://")) {
89
			jobLib = configuration.get("fs.defaultFS") + jobLib;
90
		}
91
92
		log.info("using job.lib: " + jobLib);
93
		return jobLib;
94
	}
95
96 32295 claudio.at
	protected void set(final JobConf jobConf, final Map<String, String> properties) {
97 34703 claudio.at
		for (final Entry<String, String> e : properties.entrySet()) {
98 31337 claudio.at
			if (checkHdfsProperty(e)) {
99 34703 claudio.at
				final String v = jobConf.get("fs.defaultFS") + e.getValue();
100 31337 claudio.at
				e.setValue(v);
101
			}
102
			jobConf.set(e.getKey(), e.getValue());
103
		}
104
	}
105
106
	protected void logJobDetails(final JobConf jobConf) {
107 34703 claudio.at
		final StringWriter sw = new StringWriter();
108 26600 sandro.lab
		try {
109
			jobConf.writeXml(sw);
110
			if (log.isDebugEnabled()) {
111
				log.debug("\n" + IndentXmlString.apply(sw.toString()));
112
			}
113 34703 claudio.at
		} catch (final IOException e) {
114 26600 sandro.lab
			log.warn("unable to log job details: " + jobConf.getJobName());
115
		}
116
	}
117
118
}