Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.io.StringReader;
4
import java.util.HashMap;
5
import java.util.Map;
6
import java.util.concurrent.Executors;
7
import java.util.concurrent.TimeUnit;
8

    
9
import eu.dnetlib.enabling.common.Stoppable;
10
import eu.dnetlib.enabling.common.StoppableDetails;
11
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
12
import eu.dnetlib.msro.workflows.graph.Graph;
13
import eu.dnetlib.msro.workflows.graph.GraphLoader;
14
import eu.dnetlib.msro.workflows.util.ProcessCallback;
15
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
16
import eu.dnetlib.rmi.enabling.ISLookUpService;
17
import eu.dnetlib.rmi.manager.MSROException;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.lang3.math.NumberUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.dom4j.Document;
23
import org.dom4j.Element;
24
import org.dom4j.io.SAXReader;
25
import org.springframework.beans.factory.annotation.Autowired;
26
import org.springframework.beans.factory.annotation.Required;
27

    
28
/**
29
 * Created by michele on 20/11/15.
30
 */
31
public class WorkflowExecutor implements Stoppable {
32

    
33
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
34

    
35
	@Autowired
36
	private UniqueServiceLocator serviceLocator;
37

    
38
	private GraphLoader graphLoader;
39
	private ProcessRegistry processRegistry;
40
	private ProcessFactory processFactory;
41
	private ProcessEngine processEngine;
42
	private boolean paused = false;
43

    
44
	public void init() {
45
		Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
46

    
47
			@Override
48
			public void run() {
49
				if (isPaused() || WorkflowExecutor.this.processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE) { return; }
50

    
51
				final WorkflowProcess process = WorkflowExecutor.this.processRegistry.nextProcessToStart();
52
				if (process != null) {
53
					WorkflowExecutor.this.processEngine.startProcess(process);
54
				} else {
55
					log.debug("WorkflowProcess queue is empty");
56
				}
57
			}
58
		}, 10, 10, TimeUnit.SECONDS);
59
	}
60

    
61
	public String startRepoHiWorkflow(final String profileId, final String dsId, final String iface, final ProcessCallback processCallback) throws Exception {
62

    
63
		if (isPaused()) {
64
			log.warn("Wf " + profileId + " not launched, because WorkflowExecutor is preparing for shutdown");
65
			throw new MSROException("WorkflowExecutor is preparing for shutdown");
66
		}
67

    
68
		try {
69
			final String profile = this.serviceLocator.getService(ISLookUpService.class).getResourceProfile(profileId);
70
			final Document doc = new SAXReader().read(new StringReader(profile));
71

    
72
			final String name = doc.valueOf("//WORKFLOW_NAME");
73
			final String family = doc.valueOf("//WORKFLOW_FAMILY");
74
			final int priority = NumberUtils.toInt("//WORKFLOW_PRIORITY", WorkflowsConstants.DEFAULT_WF_PRIORITY);
75
			final boolean isReady = doc.valueOf("//CONFIGURATION/@status").equals(WorkflowsConstants.WorkflowStatus.EXECUTABLE.toString());
76
			final boolean isDisabled = doc.valueOf("//CONFIGURATION/@start").equals("disabled");
77

    
78
			if (!isReady || isDisabled) {
79
				log.warn("Wf " + profileId + " not launched, because it is not ready to start or it is disabled");
80
				throw new MSROException("Workflow " + profileId + " is not ready to start");
81
			}
82

    
83
			final Map<String, String> globalParams = new HashMap<String, String>();
84
			for (final Object o : doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) {
85
				final Element p = (Element) o;
86
				globalParams.put(p.valueOf("@name"), p.getTextTrim());
87
			}
88

    
89
			final Graph graph = this.graphLoader.loadGraph(doc, globalParams);
90

    
91
			final WorkflowProcess process =
92
					this.processFactory.newProcess(name, family, dsId, iface, graph, priority, profileId, false, globalParams, processCallback);
93

    
94
			return this.processRegistry.registerProcess(process, profileId);
95
		} catch (final Exception e) {
96
			log.error("Error parsing workflow: " + profileId, e);
97
			throw new MSROException("Error parsing workflow");
98
		}
99
	}
100

    
101
	public String startWorkflow(final String profileId, final ProcessCallback processCallback) throws Exception {
102

    
103
		if (isPaused()) {
104
			log.warn("Wf " + profileId + " not launched, because WorkflowExecutor is preparing for shutdown");
105
			throw new MSROException("WorkflowExecutor is preparing for shutdown");
106
		}
107

    
108
		try {
109
			final String profile = this.serviceLocator.getService(ISLookUpService.class).getResourceProfile(profileId);
110
			final Document doc = new SAXReader().read(new StringReader(profile));
111

    
112
			final String name = doc.valueOf("//WORKFLOW_NAME");
113
			final String family = doc.valueOf("//WORKFLOW_FAMILY");
114
			final int priority = NumberUtils.toInt("//WORKFLOW_PRIORITY", WorkflowsConstants.DEFAULT_WF_PRIORITY);
115
			final boolean isReady = doc.valueOf("//CONFIGURATION/@status").equals(WorkflowsConstants.WorkflowStatus.EXECUTABLE.toString());
116
			final boolean isDisabled = doc.valueOf("//CONFIGURATION/@start").equals("disabled");
117
			final String dsId = doc.valueOf("//DATASOURCE/@id");
118
			final String iface = doc.valueOf("//DATASOURCE/@interface");
119

    
120
			if (!isReady || isDisabled) {
121
				log.warn("Wf " + profileId + " not launched, because it is not ready to start or it is disabled");
122
				throw new MSROException("Workflow " + profileId + " is not ready to start");
123
			}
124

    
125
			final Map<String, String> globalParams = new HashMap<String, String>();
126
			for (final Object o : doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) {
127
				final Element p = (Element) o;
128
				globalParams.put(p.valueOf("@name"), p.getTextTrim());
129
			}
130

    
131
			final Graph graph = this.graphLoader.loadGraph(doc, globalParams);
132

    
133
			final WorkflowProcess process =
134
					this.processFactory.newProcess(name, family, dsId, iface, graph, priority, profileId, false, globalParams, processCallback);
135

    
136
			return this.processRegistry.registerProcess(process, profileId);
137
		} catch (final Exception e) {
138
			log.error("Error parsing workflow: " + profileId, e);
139
			throw new MSROException("Error parsing workflow");
140
		}
141
	}
142

    
143
	public String startWorkflowTemplate(final String profileId,
144
			final String name,
145
			final String family,
146
			final int priority,
147
			final String dsId,
148
			final String iface,
149
			final Map<String, String> params,
150
			final ProcessCallback processCallback) throws Exception {
151

    
152
		if (isPaused()) {
153
			log.warn("Wf template " + profileId + " not launched, because WorkflowExecutor is preparing for shutdown");
154
			throw new MSROException("WorkflowExecutor is preparing for shutdown");
155
		}
156

    
157
		try {
158
			final String profile = this.serviceLocator.getService(ISLookUpService.class).getResourceProfile(profileId);
159
			final Document doc = new SAXReader().read(new StringReader(profile));
160

    
161
			final Map<String, String> globalParams = new HashMap<String, String>();
162
			for (final Object o : doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) {
163
				final Element p = (Element) o;
164
				final String pname = p.valueOf("@name");
165
				if (StringUtils.isNotBlank(params.get(pname))) {
166
					globalParams.put(pname, params.get(pname));
167
				} else if (p.selectSingleNode("@default") != null) {
168
					globalParams.put(pname, p.valueOf("@default"));
169
				} else if (!StringUtils.equalsIgnoreCase(p.valueOf("@required"), "true")) {
170
					globalParams.put(pname, "");
171
				} else {
172
					throw new MSROException("A required parameter is missing in wf template:" + pname);
173
				}
174
			}
175
			final Graph graph = this.graphLoader.loadGraph(doc, globalParams);
176

    
177
			final WorkflowProcess process =
178
					this.processFactory.newProcess(name, family, dsId, iface, graph, priority, profileId, true, globalParams, processCallback);
179

    
180
			return this.processRegistry.registerProcess(process);
181
		} catch (final Exception e) {
182
			log.error("Error parsing workflow template: " + profileId, e);
183
			throw new MSROException("Error parsing workflow template", e);
184
		}
185
	}
186

    
187
	@Override
188
	public void stop() {
189
		this.paused = true;
190
	}
191

    
192
	@Override
193
	public void resume() {
194
		this.paused = false;
195
	}
196

    
197
	@Override
198
	public StoppableDetails getStopDetails() {
199
		final int count = this.processRegistry.countRunningWfs();
200

    
201
		final StoppableDetails.StopStatus status =
202
				isPaused() ? (count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING) : StoppableDetails.StopStatus.RUNNING;
203

    
204
		return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
205
	}
206

    
207
	public ProcessRegistry getProcessRegistry() {
208
		return this.processRegistry;
209
	}
210

    
211
	@Required
212
	public void setProcessRegistry(final ProcessRegistry processRegistry) {
213
		this.processRegistry = processRegistry;
214
	}
215

    
216
	public GraphLoader getGraphLoader() {
217
		return this.graphLoader;
218
	}
219

    
220
	@Required
221
	public void setGraphLoader(final GraphLoader graphLoader) {
222
		this.graphLoader = graphLoader;
223
	}
224

    
225
	public ProcessFactory getProcessFactory() {
226
		return this.processFactory;
227
	}
228

    
229
	@Required
230
	public void setProcessFactory(final ProcessFactory processFactory) {
231
		this.processFactory = processFactory;
232
	}
233

    
234
	public ProcessEngine getProcessEngine() {
235
		return this.processEngine;
236
	}
237

    
238
	@Required
239
	public void setProcessEngine(final ProcessEngine processEngine) {
240
		this.processEngine = processEngine;
241
	}
242

    
243
	public boolean isPaused() {
244
		return this.paused;
245
	}
246

    
247
	public void setPaused(final boolean paused) {
248
		this.paused = paused;
249
	}
250
}
(7-7/8)