Project

General

Profile

1
package eu.dnetlib.msro.workflows.hadoop;
2

    
3
import java.util.List;
4

    
5
import javax.annotation.Resource;
6

    
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

    
11
import com.google.common.collect.Iterables;
12
import com.googlecode.sarasvati.NodeToken;
13

    
14
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
15
import eu.dnetlib.data.hadoop.rmi.HadoopJobType;
16
import eu.dnetlib.data.hadoop.rmi.HadoopService;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
19
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
20
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
21
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
22

    
23
public class SubmitHadoopJobNode extends BlackboardJobNode {
24

    
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(SubmitHadoopJobNode.class);
29

    
30
	public static final String OOZIE_REPORT_ACTIONS = "oozie.report.actions.csv";
31

    
32
	@Resource
33
	private UniqueServiceLocator serviceLocator;
34

    
35
	private String hadoopJob;
36

    
37
	private String cluster;
38

    
39
	private boolean simulation = false;
40

    
41
	private String oozieReportActionsCsv;
42

    
43
	@Override
44
	protected String obtainServiceId(final NodeToken token) {
45
		return getServiceLocator().getServiceId(HadoopService.class);
46
	}
47

    
48
	@Override
49
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
50
		String type = getJobType(getHadoopJob());
51

    
52
		log.info("submitting job " + getHadoopJob() + " type: " + type);
53

    
54
		job.setAction(type);
55
		job.getParameters().put("job.name", getHadoopJob());
56
		job.getParameters().put("cluster", cluster(token));
57
		job.getParameters().put("simulation", String.valueOf(isSimulation()));
58

    
59
		if (StringUtils.isNotBlank(getOozieReportActionsCsv())) {
60
			log.info("enabling oozie job report, param(s): " + getOozieReportActionsCsv());
61
			job.getParameters().put(OOZIE_REPORT_ACTIONS, getOozieReportActionsCsv());
62
		} else {
63
			log.warn("couldn't find param to enable oozie job report!");
64
		}
65

    
66
		job.getParameters().putAll(parseJsonParameters(token));
67
	}
68

    
69
	private String cluster(final NodeToken token) {
70
		if (token.getEnv().hasAttribute("cluster")) {
71
			String cluster = token.getEnv().getAttribute("cluster");
72
			log.info("found override value in wfEnv for 'cluster' param: " + cluster);
73
			return cluster;
74
		}
75
		return getCluster();
76
	}
77

    
78
	/**
79
	 * reads the job type for the given job name
80
	 *
81
	 * @param jobName
82
	 * @return
83
	 * @throws ISLookUpException
84
	 */
85
	private String getJobType(final String jobName) throws ISLookUpException {
86
		List<String> res =
87
				serviceLocator.getService(ISLookUpService.class).quickSearchProfile(
88
						"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName
89
						+ "']/@type/string()");
90
		if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); }
91

    
92
		final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res));
93

    
94
		switch (type) {
95
		case mapreduce:
96
			return HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString();
97
		case admin:
98
			return HadoopBlackboardActions.SUBMIT_ADMIN_JOB.toString();
99
		case oozie:
100
			return HadoopBlackboardActions.SUBMIT_OOZIE_JOB.toString();
101
		default:
102
			throw new IllegalStateException("undefined job type: " + type.toString());
103
		}
104
	}
105

    
106
	public String getHadoopJob() {
107
		return hadoopJob;
108
	}
109

    
110
	public void setHadoopJob(final String hadoopJob) {
111
		this.hadoopJob = hadoopJob;
112
	}
113

    
114
	public String getCluster() {
115
		return cluster;
116
	}
117

    
118
	public void setCluster(final String cluster) {
119
		this.cluster = cluster;
120
	}
121

    
122
	public boolean isSimulation() {
123
		return simulation;
124
	}
125

    
126
	public void setSimulation(final boolean simulation) {
127
		this.simulation = simulation;
128
	}
129

    
130
	public String getOozieReportActionsCsv() {
131
		return oozieReportActionsCsv;
132
	}
133

    
134
	public void setOozieReportActionsCsv(final String oozieReportActionsCsv) {
135
		this.oozieReportActionsCsv = oozieReportActionsCsv;
136
	}
137
}
(12-12/12)