Project

General

Profile

1
package eu.dnetlib.data.hadoop.action;
2

    
3
import java.util.Map;
4
import java.util.Map.Entry;
5
import java.util.Properties;
6
import java.util.Set;
7

    
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Sets;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.oozie.client.OozieClient;
15
import org.apache.oozie.client.OozieClientException;
16

    
17
import eu.dnetlib.data.hadoop.HadoopJob;
18
import eu.dnetlib.data.hadoop.config.ClusterName;
19
import eu.dnetlib.data.hadoop.oozie.OozieJobMonitor;
20
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
21
import eu.dnetlib.data.hadoop.utils.JobProfile;
22
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
23

    
24
public class SubmitOozieJobAction extends AbstractSubmitAction {
25

    
26
	private static final Log log = LogFactory.getLog(SubmitOozieJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
27

    
28
	public static final String OOZIE_REPORT_ACTIONS = "oozie.report.actions.csv";
29

    
30
	@Override
31
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
32
			throws HadoopServiceException {
33

    
34
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
35

    
36
		try {
37
			final Properties jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
38
			log.debug("oozie job configuration:\n" + jobConf);
39

    
40
			if (!hadoopClientMap.isOozieAvailable(clusterName)) throw new HadoopServiceException("oozie not available for cluster: " + clusterName.toString());
41

    
42
			logJobDetails(jobConf);
43

    
44
			final OozieClient oozieClient = hadoopClientMap.getOozieClient(clusterName);
45
			final String internalId = oozieClient.run(jobConf);
46
			final String jobId = newJobId(clusterName, internalId);
47

    
48
			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
49
					new OozieJobMonitor(oozieClient, internalId, callback, getWorkflowActions(bbJob))));
50

    
51
		} catch (final OozieClientException e) {
52
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
53
		}
54
	}
55

    
56
	/**
57
	 * Extracts from the blackboard job params the oozie action names as csv and returns them as a set.
58
	 * @param bbJob
59
	 * @return
60
	 */
61
	private Set<String> getWorkflowActions(final BlackboardJob bbJob) {
62
		final String actions = bbJob.getParameters().get(OOZIE_REPORT_ACTIONS);
63
		if (StringUtils.isNotBlank(actions)) {
64
			return Sets.newHashSet(Splitter.on(",").split(actions));
65
		}
66
		return Sets.newHashSet();
67
	}
68

    
69
	private Properties prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters) {
70

    
71
		log.info("creating job: " + jobName);
72

    
73
		final Properties p = new Properties();
74

    
75
		merge(p, configuration);
76
		merge(p, jobProfile.getJobDefinition().entrySet());
77
		merge(p, parameters.entrySet());
78

    
79
		return p;
80
	}
81

    
82
	private void merge(final Properties p, final Iterable<Entry<String, String>> entrySet) {
83
		for (final Entry<String, String> e : entrySet) {
84
			p.setProperty(e.getKey(), e.getValue());
85
		}
86
	}
87

    
88
	protected void logJobDetails(final Properties jobConf) {
89
		for (final Entry<?, ?> e : jobConf.entrySet()) {
90
			if (log.isDebugEnabled()) {
91
				log.debug("\n" + e.getKey().toString() + " : " + e.getValue().toString());
92
			}
93
		}
94
	}
95

    
96
}
(13-13/13)