Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.hadoop.action;
2
3 32863 claudio.at
import java.net.URI;
4
import java.util.List;
5
import java.util.Map;
6
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.fs.Path;
11
import org.springframework.beans.factory.annotation.Autowired;
12
13 26600 sandro.lab
import com.google.common.collect.Iterables;
14 31337 claudio.at
import com.google.common.collect.Sets;
15 32863 claudio.at
16 31337 claudio.at
import eu.dnetlib.data.hadoop.HadoopClientMap;
17
import eu.dnetlib.data.hadoop.JobRegistry;
18 26600 sandro.lab
import eu.dnetlib.data.hadoop.config.ClusterName;
19
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
20 31337 claudio.at
import eu.dnetlib.data.hadoop.utils.JobProfile;
21 26600 sandro.lab
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22 32783 andrea.man
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23 26600 sandro.lab
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
24
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
25 31337 claudio.at
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
26
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
27 26600 sandro.lab
import eu.dnetlib.miscutils.datetime.DateUtils;
28
import eu.dnetlib.miscutils.functional.hash.Hashing;
29
30
public abstract class AbstractSubmitAction extends AbstractHadoopAction {
31
32 32863 claudio.at
	private static final Log log = LogFactory.getLog(AbstractSubmitAction.class); // NOPMD by marko on 11/24/08 5:02 PM
33 26600 sandro.lab
34 32863 claudio.at
	@Autowired
35
	protected HadoopClientMap hadoopClientMap;
36 31337 claudio.at
37 32863 claudio.at
	@Autowired
38
	protected JobRegistry jobRegistry;
39 31337 claudio.at
40 32863 claudio.at
	protected abstract void submit(final JobCompletion callback, final BlackboardJob job, final String jobName, final JobProfile jobProfile)
41
			throws HadoopServiceException;
42 26600 sandro.lab
43 32863 claudio.at
	@Override
44
	protected void executeAsync(final BlackboardServerHandler handler, final BlackboardJob job) throws HadoopServiceException {
45
		final String jobName = job.getParameters().get("job.name");
46
		final JobProfile jobProfile = loadISJobConfiguration(jobName, job.getParameters());
47 26600 sandro.lab
48 32863 claudio.at
		validateJobParams(handler, job, jobName, jobProfile);
49 26600 sandro.lab
50 32863 claudio.at
		if (!isSimulation(job)) {
51 26600 sandro.lab
52 32863 claudio.at
			submit(newCompletionCallback(handler, job), job, jobName, jobProfile);
53
			updateJobStatus(jobName);
54
		} else {
55
			log.info(String.format("simulating job: '%s', done!", jobName));
56
			handler.done(job);
57
		}
58
	}
59 26600 sandro.lab
60 32863 claudio.at
	private boolean isSimulation(final BlackboardJob job) {
61
		if (!job.getParameters().containsKey("simulation")) { return false; }
62
		final String sim = job.getParameters().get("simulation");
63
		return !StringUtils.isBlank(sim) && sim.equals("true");
64
	}
65 26600 sandro.lab
66 32863 claudio.at
	protected void updateJobStatus(final String jobName) {
67
		incrementCumulativeRun(jobName);
68
		incrementRunningJobs(jobName);
69
		updateDate(jobName);
70
	}
71 26600 sandro.lab
72 32863 claudio.at
	protected void incrementRunningJobs(final String jobName) {
73
		log.debug("increment #running jobs: " + jobName);
74
		updateCountElement(jobName, "RUNNING_INSTANCES", "+ 1");
75
	}
76 26600 sandro.lab
77 32863 claudio.at
	protected void decrementRunningJobs(final String jobName) {
78
		log.debug("decrement #running jobs: " + jobName);
79
		updateCountElement(jobName, "RUNNING_INSTANCES", "- 1");
80
	}
81 26600 sandro.lab
82 32863 claudio.at
	protected void incrementCumulativeRun(final String jobName) {
83
		log.debug("increment #cumulative runs for job: " + jobName);
84
		updateCountElement(jobName, "CUMULATIVE_RUN", "+ 1");
85
	}
86 26600 sandro.lab
87 32863 claudio.at
	protected void updateDate(final String jobName) {
88
		log.info("increment last submission date for job: " + jobName);
89
		final String xquery =
90
				"for $x in collection('')/RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='"
91
						+ jobName + "'] " + " return update value $x//LAST_SUBMISSION_DATE/@value with '" + DateUtils.now_ISO8601() + "' ";
92 26600 sandro.lab
93 32863 claudio.at
		executeXUpdate(xquery);
94
	}
95 31337 claudio.at
96 32863 claudio.at
	private void updateCountElement(final String jobName, final String element, final String delta) {
97
		final String xquery =
98
				"let $x := //RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName
99
						+ "'], $tot := $x//STATUS/" + element + "/@value/number() " + delta + " return update replace $x//STATUS/" + element + " with <"
100
						+ element + " value='{$tot}' />";
101 31337 claudio.at
102 32863 claudio.at
		executeXUpdate(xquery);
103
	}
104 31337 claudio.at
105 32863 claudio.at
	protected String newJobId(final ClusterName clusterName, final Object internalId) {
106
		return "job-" + Hashing.md5(clusterName.toString() + internalId.toString());
107
	}
108 31337 claudio.at
109 32863 claudio.at
	protected String getDefaultLibPath(final String defaultFs) throws HadoopServiceException {
110
		try {
111
			final String libPath = queryForServiceProperty("defaultLibPath");
112
			final Path path = new Path(URI.create(defaultFs + libPath));
113 26600 sandro.lab
114 32863 claudio.at
			return path.toString();
115
		} catch (ISLookUpException e) {
116
			throw new HadoopServiceException("unable to get default lib path", e);
117
		}
118
	}
119 26600 sandro.lab
120 32863 claudio.at
	protected void validateJobParams(final BlackboardServerHandler handler, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
121
			throws HadoopServiceException {
122
		if (!bbJob.getParameters().keySet().containsAll(jobProfile.getRequiredParams())) {
123
			String msg =
124
					"required parameter is missing for job: " + jobName + ", required params: " + jobProfile.getRequiredParams() + "\n\nmissing params: "
125
							+ Sets.difference(jobProfile.getRequiredParams(), bbJob.getParameters().keySet());
126
			log.error(msg);
127
			HadoopServiceException e = new HadoopServiceException(msg);
128
			handler.failed(bbJob, e);
129
			throw e;
130
		}
131
	}
132 26600 sandro.lab
133 32863 claudio.at
	protected JobCompletion newCompletionCallback(final BlackboardServerHandler handler, final BlackboardJob bbJob) {
134
		final String jobName = bbJob.getParameters().get("job.name");
135
		return new JobCompletion() {
136 32818 claudio.at
137 32863 claudio.at
			@Override
138 60355 claudio.at
			public void done(final Map<String, String> report) {
139
				bbJob.getParameters().putAll(report);
140
				log.info(String.format("%s completed successfully, returning %s output params", jobName, report.size()));
141 32863 claudio.at
				handler.done(bbJob);
142
				decrementRunningJobs(jobName);
143
			}
144 32818 claudio.at
145 32863 claudio.at
			@Override
146 60355 claudio.at
			public void failed(final Map<String, String> report, final String msg, final Throwable e) {
147
				bbJob.getParameters().putAll(report);
148
				log.error(String.format("%s failed, returning %s output params", jobName, report.size()));
149 32863 claudio.at
				handler.failed(bbJob, e);
150
				decrementRunningJobs(jobName);
151
			}
152
		};
153
	}
154 32818 claudio.at
155 32863 claudio.at
	private String queryForServiceProperty(final String key) throws ISLookUpException {
156
		return getServiceConfigValue("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='HadoopServiceResourceType'] return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='"
157
				+ key + "']/@value/string()");
158
	}
159 32818 claudio.at
160 32863 claudio.at
	private String getServiceConfigValue(final String xquery) throws ISLookUpException {
161
		log.debug("quering for service property: " + xquery);
162
		final List<String> urls = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
163
		if (urls == null || urls.size() != 1) { throw new IllegalStateException("unable to find unique service property, xquery: " + xquery); }
164
		return Iterables.getOnlyElement(urls);
165
	}
166 32818 claudio.at
167 32863 claudio.at
	private boolean executeXUpdate(final String xupdate) {
168
		try {
169
			log.debug("running xupdate: " + xupdate);
170
			return serviceLocator.getService(ISRegistryService.class).executeXUpdate(xupdate);
171
		} catch (ISRegistryException e) {
172
			log.error("unable to run xupdate: " + xupdate, e);
173
			return false;
174
		}
175
	}
176
177 26600 sandro.lab
}