Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.hadoop;
2

    
3
import java.util.HashMap;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7
import javax.annotation.Resource;
8

    
9
import com.google.common.collect.Iterables;
10
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
12
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
13
import eu.dnetlib.msro.workflows.procs.Env;
14
import eu.dnetlib.msro.workflows.procs.Token;
15
import eu.dnetlib.rmi.data.hadoop.HadoopBlackboardActions;
16
import eu.dnetlib.rmi.data.hadoop.HadoopJobType;
17
import eu.dnetlib.rmi.data.hadoop.HadoopService;
18
import eu.dnetlib.rmi.enabling.ISLookUpException;
19
import eu.dnetlib.rmi.enabling.ISLookUpService;
20
import eu.dnetlib.rmi.manager.MSROException;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23

    
24
public class SubmitHadoopJobNode extends BlackboardJobNode {
25

    
26
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(SubmitHadoopJobNode.class);
30

    
31
	@Resource
32
	private UniqueServiceLocator serviceLocator;
33

    
34
	private String hadoopJob;
35

    
36
	private String cluster;
37

    
38
	private boolean simulation = false;
39

    
40
	private Map<String, Object> jobParams = new HashMap<>();
41

    
42
	@Override
43
	protected String obtainServiceId(final Env env) {
44
		return getServiceLocator().getServiceId(HadoopService.class);
45
	}
46

    
47
	@Override
48
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
49
		String type = getJobType(getHadoopJob());
50

    
51
		log.info("submitting job " + getHadoopJob() + " type: " + type);
52

    
53
		job.setAction(type);
54
		job.getParameters().put("job.name", getHadoopJob());
55
		job.getParameters().put("cluster", cluster(token));
56
		job.getParameters().put("simulation", String.valueOf(isSimulation()));
57

    
58
		for (Entry<String, Object> e : getJobParams().entrySet()) {
59
			try {
60
				job.getParameters().put(e.getKey(), e.getValue().toString());
61
			} catch (Throwable ex) {
62
				throw new MSROException(String.format("missing value for map key '%s'", e.getKey()));
63
			}
64
		}
65
	}
66

    
67
	private String cluster(final Token token) {
68
		if (token.getEnv().hasAttribute("cluster")) {
69
			String cluster = token.getEnv().getAttribute("cluster", String.class);
70
			log.info("found override value in wfEnv for 'cluster' param: " + cluster);
71
			return cluster;
72
		}
73
		return getCluster();
74
	}
75

    
76
	/**
77
	 * reads the job type for the given job name
78
	 *
79
	 * @param jobName
80
	 * @return
81
	 * @throws ISLookUpException
82
	 */
83
	private String getJobType(final String jobName) throws ISLookUpException {
84
		List<String> res =
85
				serviceLocator.getService(ISLookUpService.class).quickSearchProfile(
86
						"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName
87
								+ "']/@type/string()");
88
		if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); }
89

    
90
		final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res));
91

    
92
		switch (type) {
93
		case mapreduce:
94
			return HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString();
95
		case admin:
96
			return HadoopBlackboardActions.SUBMIT_ADMIN_JOB.toString();
97
		case oozie:
98
			return HadoopBlackboardActions.SUBMIT_OOZIE_JOB.toString();
99
		default:
100
			throw new IllegalStateException("undefined job type: " + type.toString());
101
		}
102
	}
103

    
104
	public String getHadoopJob() {
105
		return hadoopJob;
106
	}
107

    
108
	public void setHadoopJob(final String hadoopJob) {
109
		this.hadoopJob = hadoopJob;
110
	}
111

    
112
	public String getCluster() {
113
		return cluster;
114
	}
115

    
116
	public void setCluster(final String cluster) {
117
		this.cluster = cluster;
118
	}
119

    
120
	public boolean isSimulation() {
121
		return simulation;
122
	}
123

    
124
	public void setSimulation(final boolean simulation) {
125
		this.simulation = simulation;
126
	}
127

    
128
	public Map<String, Object> getJobParams() {
129
		return jobParams;
130
	}
131

    
132
	public void setJobParams(final Map<String, Object> jobParams) {
133
		this.jobParams = jobParams;
134
	}
135

    
136
}
(22-22/22)