Project

General

Profile

1
package eu.dnetlib.data.hadoop.blackboard;
2

    
3
import java.util.Map;
4
import java.util.Map.Entry;
5
import java.util.Properties;
6

    
7
import eu.dnetlib.rmi.data.hadoop.ClusterName;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.conf.Configuration;
11
import org.apache.oozie.client.OozieClient;
12
import org.apache.oozie.client.OozieClientException;
13

    
14
import eu.dnetlib.data.hadoop.HadoopJob;
15
import eu.dnetlib.data.hadoop.oozie.OozieJobMonitor;
16
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
17
import eu.dnetlib.data.hadoop.utils.JobProfile;
18
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
19

    
20
public class SubmitOozieJobAction extends AbstractSubmitAction {
21

    
22
	private static final Log log = LogFactory.getLog(SubmitOozieJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
23

    
24
	@Override
25
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
26
			throws HadoopServiceException {
27

    
28
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
29

    
30
		try {
31
			final Properties jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
32
			log.debug("oozie job configuration:\n" + jobConf);
33

    
34
			if (!hadoopClientMap.isOozieAvailable(clusterName)) throw new HadoopServiceException("oozie not available for cluster: " + clusterName.toString());
35

    
36
			logJobDetails(jobConf);
37

    
38
			final OozieClient oozieClient = hadoopClientMap.getOozieClient(clusterName);
39
			final String internalId = oozieClient.run(jobConf);
40
			final String jobId = newJobId(clusterName, internalId);
41

    
42
			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
43
					new OozieJobMonitor(oozieClient, internalId, callback)));
44

    
45
		} catch (final OozieClientException e) {
46
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
47
		}
48
	}
49

    
50
	private Properties prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters) {
51

    
52
		log.info("creating job: " + jobName);
53

    
54
		final Properties p = new Properties();
55

    
56
		merge(p, configuration);
57
		merge(p, jobProfile.getJobDefinition().entrySet());
58
		merge(p, parameters.entrySet());
59

    
60
		return p;
61
	}
62

    
63
	private void merge(final Properties p, final Iterable<Entry<String, String>> entrySet) {
64
		for (final Entry<String, String> e : entrySet) {
65
			p.setProperty(e.getKey(), e.getValue());
66
		}
67
	}
68

    
69
	protected void logJobDetails(final Properties jobConf) {
70
		for (final Entry<?, ?> e : jobConf.entrySet()) {
71
			if (log.isDebugEnabled()) {
72
				log.debug("\n" + e.getKey().toString() + " : " + e.getValue().toString());
73
			}
74
		}
75
	}
76

    
77
}
(13-13/13)