Project

General

Profile

1
package eu.dnetlib.data.hadoop.action;
2

    
3
import java.net.URI;
4
import java.util.List;
5
import java.util.Map;
6

    
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.fs.Path;
11
import org.springframework.beans.factory.annotation.Autowired;
12

    
13
import com.google.common.collect.Iterables;
14
import com.google.common.collect.Sets;
15

    
16
import eu.dnetlib.data.hadoop.HadoopClientMap;
17
import eu.dnetlib.data.hadoop.JobRegistry;
18
import eu.dnetlib.data.hadoop.config.ClusterName;
19
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
20
import eu.dnetlib.data.hadoop.utils.JobProfile;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
24
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
25
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
26
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
27
import eu.dnetlib.miscutils.datetime.DateUtils;
28
import eu.dnetlib.miscutils.functional.hash.Hashing;
29

    
30
public abstract class AbstractSubmitAction extends AbstractHadoopAction {
31

    
32
	private static final Log log = LogFactory.getLog(AbstractSubmitAction.class); // NOPMD by marko on 11/24/08 5:02 PM
33

    
34
	@Autowired
35
	protected HadoopClientMap hadoopClientMap;
36

    
37
	@Autowired
38
	protected JobRegistry jobRegistry;
39

    
40
	protected abstract void submit(final JobCompletion callback, final BlackboardJob job, final String jobName, final JobProfile jobProfile)
41
			throws HadoopServiceException;
42

    
43
	@Override
44
	protected void executeAsync(final BlackboardServerHandler handler, final BlackboardJob job) throws HadoopServiceException {
45
		final String jobName = job.getParameters().get("job.name");
46
		final JobProfile jobProfile = loadISJobConfiguration(jobName, job.getParameters());
47

    
48
		validateJobParams(handler, job, jobName, jobProfile);
49

    
50
		if (!isSimulation(job)) {
51

    
52
			submit(newCompletionCallback(handler, job), job, jobName, jobProfile);
53
			updateJobStatus(jobName);
54
		} else {
55
			log.info(String.format("simulating job: '%s', done!", jobName));
56
			handler.done(job);
57
		}
58
	}
59

    
60
	private boolean isSimulation(final BlackboardJob job) {
61
		if (!job.getParameters().containsKey("simulation")) { return false; }
62
		final String sim = job.getParameters().get("simulation");
63
		return !StringUtils.isBlank(sim) && sim.equals("true");
64
	}
65

    
66
	protected void updateJobStatus(final String jobName) {
67
		incrementCumulativeRun(jobName);
68
		incrementRunningJobs(jobName);
69
		updateDate(jobName);
70
	}
71

    
72
	protected void incrementRunningJobs(final String jobName) {
73
		log.debug("increment #running jobs: " + jobName);
74
		updateCountElement(jobName, "RUNNING_INSTANCES", "+ 1");
75
	}
76

    
77
	protected void decrementRunningJobs(final String jobName) {
78
		log.debug("decrement #running jobs: " + jobName);
79
		updateCountElement(jobName, "RUNNING_INSTANCES", "- 1");
80
	}
81

    
82
	protected void incrementCumulativeRun(final String jobName) {
83
		log.debug("increment #cumulative runs for job: " + jobName);
84
		updateCountElement(jobName, "CUMULATIVE_RUN", "+ 1");
85
	}
86

    
87
	protected void updateDate(final String jobName) {
88
		log.info("increment last submission date for job: " + jobName);
89
		final String xquery =
90
				"for $x in collection('')/RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='"
91
						+ jobName + "'] " + " return update value $x//LAST_SUBMISSION_DATE/@value with '" + DateUtils.now_ISO8601() + "' ";
92

    
93
		executeXUpdate(xquery);
94
	}
95

    
96
	private void updateCountElement(final String jobName, final String element, final String delta) {
97
		final String xquery =
98
				"let $x := //RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName
99
						+ "'], $tot := $x//STATUS/" + element + "/@value/number() " + delta + " return update replace $x//STATUS/" + element + " with <"
100
						+ element + " value='{$tot}' />";
101

    
102
		executeXUpdate(xquery);
103
	}
104

    
105
	protected String newJobId(final ClusterName clusterName, final Object internalId) {
106
		return "job-" + Hashing.md5(clusterName.toString() + internalId.toString());
107
	}
108

    
109
	protected String getDefaultLibPath(final String defaultFs) throws HadoopServiceException {
110
		try {
111
			final String libPath = queryForServiceProperty("defaultLibPath");
112
			final Path path = new Path(URI.create(defaultFs + libPath));
113

    
114
			return path.toString();
115
		} catch (ISLookUpException e) {
116
			throw new HadoopServiceException("unable to get default lib path", e);
117
		}
118
	}
119

    
120
	protected void validateJobParams(final BlackboardServerHandler handler, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
121
			throws HadoopServiceException {
122
		if (!bbJob.getParameters().keySet().containsAll(jobProfile.getRequiredParams())) {
123
			String msg =
124
					"required parameter is missing for job: " + jobName + ", required params: " + jobProfile.getRequiredParams() + "\n\nmissing params: "
125
							+ Sets.difference(jobProfile.getRequiredParams(), bbJob.getParameters().keySet());
126
			log.error(msg);
127
			HadoopServiceException e = new HadoopServiceException(msg);
128
			handler.failed(bbJob, e);
129
			throw e;
130
		}
131
	}
132

    
133
	protected JobCompletion newCompletionCallback(final BlackboardServerHandler handler, final BlackboardJob bbJob) {
134
		final String jobName = bbJob.getParameters().get("job.name");
135
		return new JobCompletion() {
136

    
137
			@Override
138
			public void done(final Map<String, String> report) {
139
				bbJob.getParameters().putAll(report);
140
				log.info(String.format("%s completed successfully, returning %s output params", jobName, report.size()));
141
				handler.done(bbJob);
142
				decrementRunningJobs(jobName);
143
			}
144

    
145
			@Override
146
			public void failed(final Map<String, String> report, final String msg, final Throwable e) {
147
				bbJob.getParameters().putAll(report);
148
				log.error(String.format("%s failed, returning %s output params", jobName, report.size()));
149
				handler.failed(bbJob, e);
150
				decrementRunningJobs(jobName);
151
			}
152
		};
153
	}
154

    
155
	private String queryForServiceProperty(final String key) throws ISLookUpException {
156
		return getServiceConfigValue("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='HadoopServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
157
				+ key + "']/@value/string()");
158
	}
159

    
160
	private String getServiceConfigValue(final String xquery) throws ISLookUpException {
161
		log.debug("quering for service property: " + xquery);
162
		final List<String> urls = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
163
		if (urls == null || urls.size() != 1) { throw new IllegalStateException("unable to find unique service property, xquery: " + xquery); }
164
		return Iterables.getOnlyElement(urls);
165
	}
166

    
167
	private boolean executeXUpdate(final String xupdate) {
168
		try {
169
			log.debug("running xupdate: " + xupdate);
170
			return serviceLocator.getService(ISRegistryService.class).executeXUpdate(xupdate);
171
		} catch (ISRegistryException e) {
172
			log.error("unable to run xupdate: " + xupdate, e);
173
			return false;
174
		}
175
	}
176

    
177
}
(2-2/13)