Project

General

Profile

1
package eu.dnetlib.msro.workflows.hadoop;
2

    
3
import java.util.List;
4

    
5
import javax.annotation.Resource;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

    
10
import com.google.common.collect.Iterables;
11
import com.googlecode.sarasvati.NodeToken;
12

    
13
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
14
import eu.dnetlib.data.hadoop.rmi.HadoopJobType;
15
import eu.dnetlib.data.hadoop.rmi.HadoopService;
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
19
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
20
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
21

    
22
public class SubmitHadoopJobNode extends BlackboardJobNode {
23

    
24
	/**
25
	 * logger.
26
	 */
27
	private static final Log log = LogFactory.getLog(SubmitHadoopJobNode.class);
28

    
29
	@Resource
30
	private UniqueServiceLocator serviceLocator;
31

    
32
	private String hadoopJob;
33

    
34
	private String cluster;
35

    
36
	private boolean simulation = false;
37

    
38
	@Override
39
	protected String obtainServiceId(final NodeToken token) {
40
		return getServiceLocator().getServiceId(HadoopService.class);
41
	}
42

    
43
	@Override
44
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
45
		String type = getJobType(getHadoopJob());
46

    
47
		log.info("submitting job " + getHadoopJob() + " type: " + type);
48

    
49
		job.setAction(type);
50
		job.getParameters().put("job.name", getHadoopJob());
51
		job.getParameters().put("cluster", cluster(token));
52
		job.getParameters().put("simulation", String.valueOf(isSimulation()));
53

    
54
		job.getParameters().putAll(parseJsonParameters(token));
55
	}
56

    
57
	private String cluster(final NodeToken token) {
58
		if (token.getEnv().hasAttribute("cluster")) {
59
			String cluster = token.getEnv().getAttribute("cluster");
60
			log.info("found override value in wfEnv for 'cluster' param: " + cluster);
61
			return cluster;
62
		}
63
		return getCluster();
64
	}
65

    
66
	/**
67
	 * reads the job type for the given job name
68
	 *
69
	 * @param jobName
70
	 * @return
71
	 * @throws ISLookUpException
72
	 */
73
	private String getJobType(final String jobName) throws ISLookUpException {
74
		List<String> res =
75
				serviceLocator.getService(ISLookUpService.class).quickSearchProfile(
76
						"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType']//HADOOP_JOB[./@name='" + jobName
77
						+ "']/@type/string()");
78
		if (res.isEmpty()) { throw new IllegalStateException("unable to find job type for job: " + jobName); }
79

    
80
		final HadoopJobType type = HadoopJobType.valueOf(Iterables.getOnlyElement(res));
81

    
82
		switch (type) {
83
		case mapreduce:
84
			return HadoopBlackboardActions.SUBMIT_MAPREDUCE_JOB.toString();
85
		case admin:
86
			return HadoopBlackboardActions.SUBMIT_ADMIN_JOB.toString();
87
		case oozie:
88
			return HadoopBlackboardActions.SUBMIT_OOZIE_JOB.toString();
89
		default:
90
			throw new IllegalStateException("undefined job type: " + type.toString());
91
		}
92
	}
93

    
94
	public String getHadoopJob() {
95
		return hadoopJob;
96
	}
97

    
98
	public void setHadoopJob(final String hadoopJob) {
99
		this.hadoopJob = hadoopJob;
100
	}
101

    
102
	public String getCluster() {
103
		return cluster;
104
	}
105

    
106
	public void setCluster(final String cluster) {
107
		this.cluster = cluster;
108
	}
109

    
110
	public boolean isSimulation() {
111
		return simulation;
112
	}
113

    
114
	public void setSimulation(final boolean simulation) {
115
		this.simulation = simulation;
116
	}
117
}
(12-12/12)