Project

General

Profile

1
package eu.dnetlib.data.hadoop.blackboard;
2

    
3
import java.io.IOException;
4
import java.io.StringWriter;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

    
8
import eu.dnetlib.miscutils.functional.xml.XMLIndenter;
9
import eu.dnetlib.rmi.data.hadoop.ClusterName;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.apache.hadoop.conf.Configuration;
13
import org.apache.hadoop.fs.Path;
14
import org.apache.hadoop.mapred.JobConf;
15
import org.apache.hadoop.mapred.RunningJob;
16

    
17
import eu.dnetlib.data.hadoop.HadoopJob;
18
import eu.dnetlib.data.hadoop.mapreduce.MapreduceJobMonitor;
19
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
20
import eu.dnetlib.data.hadoop.utils.JobProfile;
21
import eu.dnetlib.data.hadoop.utils.ScanFactory;
22
import eu.dnetlib.data.hadoop.utils.ScanProperties;
23
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
24
import org.springframework.beans.factory.annotation.Autowired;
25

    
26
public class SubmitMapreduceJobAction extends AbstractSubmitAction {
27

    
28
	/**
29
	 * logger.
30
	 */
31
	private static final Log log = LogFactory.getLog(SubmitMapreduceJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
32

    
33
	@Autowired
34
	private ScanFactory scanFactory;
35

    
36
	@Override
37
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
38
			throws HadoopServiceException {
39

    
40
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
41

    
42
		try {
43
			final JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
44

    
45
			if (!hadoopClientMap.isMapreduceAvailable(clusterName))
46
				throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString());
47

    
48
			logJobDetails(jobConf);
49

    
50
			final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf);
51
			final String jobId = newJobId(clusterName, runningJob.getID().getId());
52

    
53
			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
54
					new MapreduceJobMonitor(runningJob, callback)));
55

    
56
		} catch (final IOException e) {
57
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
58
		}
59
	}
60

    
61
	protected JobConf prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters)
62
			throws IOException, HadoopServiceException {
63

    
64
		log.info("creating job: " + jobName);
65

    
66
		final JobConf jobConf = new JobConf(configuration);
67
		jobConf.setJobName(jobName);
68
		jobConf.set("dnet.mapred.job.description", jobProfile.getDescription());
69

    
70
		final String jobLib = getJobLib(configuration, jobProfile);
71
		jobConf.setJar(new Path(jobLib).toString());
72

    
73
		set(jobConf, jobProfile.getJobDefinition());
74
		set(jobConf, parameters);
75

    
76
		// if we're reading from hbase table then we need also to set a scanner.
77
		final ScanProperties scanProperties = jobProfile.getScanProperties();
78
		if (jobProfile.getRequiredParams().contains("hbase.mapreduce.inputtable") && (scanProperties != null)) {
79
			jobConf.set("hbase.mapreduce.scan", scanFactory.getScan(scanProperties));
80
		}
81

    
82
		return jobConf;
83
	}
84

    
85
	protected String getJobLib(final Configuration configuration, final JobProfile jobProfile) throws HadoopServiceException {
86
		String jobLib = getDefaultLibPath(configuration.get("fs.defaultFS"));
87

    
88
		if (jobProfile.getJobDefinition().containsKey("job.lib")) {
89
			jobLib = jobProfile.getJobDefinition().get("job.lib");
90
		}
91

    
92
		if ((jobLib == null) || jobLib.isEmpty()) throw new HadoopServiceException("job.lib must refer to an absolute or relative HDFS path");
93
		if (!jobLib.startsWith("hdfs://")) {
94
			jobLib = configuration.get("fs.defaultFS") + jobLib;
95
		}
96

    
97
		log.info("using job.lib: " + jobLib);
98
		return jobLib;
99
	}
100

    
101
	protected void set(final JobConf jobConf, final Map<String, String> properties) {
102
		for (final Entry<String, String> e : properties.entrySet()) {
103
			if (checkHdfsProperty(e)) {
104
				final String v = jobConf.get("fs.defaultFS") + e.getValue();
105
				e.setValue(v);
106
			}
107
			jobConf.set(e.getKey(), e.getValue());
108
		}
109
	}
110

    
111
	protected void logJobDetails(final JobConf jobConf) {
112
		final StringWriter sw = new StringWriter();
113
		try {
114
			jobConf.writeXml(sw);
115
			if (log.isDebugEnabled()) {
116
				log.debug("\n" + XMLIndenter.indent(sw.toString()));
117
			}
118
		} catch (final IOException e) {
119
			log.warn("unable to log job details: " + jobConf.getJobName());
120
		}
121
	}
122

    
123
}
(12-12/13)