Project

General

Profile

1
package eu.dnetlib.msro.workflows.sarasvati.loader;
2

    
3
import java.io.File;
4
import java.util.Comparator;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.PriorityBlockingQueue;
9
import java.util.concurrent.ScheduledExecutorService;
10
import java.util.concurrent.TimeUnit;
11

    
12
import javax.annotation.Resource;
13

    
14
import org.apache.commons.lang.math.NumberUtils;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.springframework.beans.factory.annotation.Required;
18

    
19
import com.googlecode.sarasvati.Graph;
20
import com.googlecode.sarasvati.GraphProcess;
21
import com.googlecode.sarasvati.mem.MemEngine;
22
import com.googlecode.sarasvati.mem.MemGraphProcess;
23

    
24
import eu.dnetlib.enabling.common.Stoppable;
25
import eu.dnetlib.enabling.common.StoppableDetails;
26
import eu.dnetlib.enabling.common.StoppableDetails.StopStatus;
27
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
28
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
29
import eu.dnetlib.miscutils.datetime.DateUtils;
30
import eu.dnetlib.msro.rmi.MSROException;
31
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
32
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
33

    
34
public class WorkflowExecutor implements Stoppable {
35

    
36
	private MemEngine engine;
37
	private GraphLoader graphLoader;
38
	private GraphProcessRegistry graphProcessRegistry;
39
	private ProfileToSarasvatiConverter profileToSarasvatiConverter;
40
	private ScheduledExecutorService queueConsumers;
41

    
42
	private boolean paused = false;
43

    
44
	@Resource
45
	private UniqueServiceLocator serviceLocator;
46

    
47
	public boolean isPaused() {
48
		return paused;
49
	}
50

    
51
	public void setPaused(final boolean paused) {
52
		this.paused = paused;
53
	}
54

    
55
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
56

    
57
	private PriorityBlockingQueue<GraphProcess> pendingProcs = new PriorityBlockingQueue<GraphProcess>(20, new Comparator<GraphProcess>() {
58

    
59
		@Override
60
		public int compare(final GraphProcess p1, final GraphProcess p2) {
61
			int n1 = NumberUtils.toInt(p1.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY), WorkflowsConstants.DEFAULT_WF_PRIORITY);
62
			int n2 = NumberUtils.toInt(p2.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY), WorkflowsConstants.DEFAULT_WF_PRIORITY);
63
			return NumberUtils.compare(n1, n2);
64
		}
65
	});
66

    
67
	public void init() {
68
		this.queueConsumers = Executors.newScheduledThreadPool(WorkflowsConstants.MAX_WF_THREADS);
69
		final int period = 60;
70
		final int step = period / WorkflowsConstants.MAX_WF_THREADS;
71

    
72
		for (int i = 0; i < WorkflowsConstants.MAX_WF_THREADS; i++) {
73
			this.queueConsumers.scheduleAtFixedRate(new Runnable() {
74

    
75
				@Override
76
				public void run() {
77
					if (isPaused()) {
78
					return;
79
					}
80

    
81
					final GraphProcess process = pendingProcs.poll();
82
					if (process != null) {
83
						log.info("Starting workflow: " + process);
84
						final long now = DateUtils.now();
85
						process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_START_DATE, now);
86
						process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_START_HUMAN_DATE, DateUtils.calculate_ISO8601(now));
87
						engine.startProcess(process);
88
					} else {
89
						log.debug("Process queue is empty");
90
					}
91
				}
92
			}, i * step, period, TimeUnit.SECONDS);
93
		}
94
	}
95

    
96
	public String startProcess(final String profileId) throws Exception {
97
		return startProcess(profileId, null);
98
	}
99

    
100
	public String startProcess(final String profileId, final Map<String, Object> params) throws Exception {
101
		final WfProfileDescriptor desc = profileToSarasvatiConverter.getSarasvatiWorkflow(profileId);
102

    
103
		if (isPaused()) {
104
			log.warn("Wf " + profileId + " not launched, because WorkflowExecutor is preparing for shutdown");
105
			throw new MSROException("WorkflowExecutor is preparing for shutdown");
106
		}
107
		if (!desc.isReady()) {
108
			log.warn("Wf " + profileId + " not launched, because it is not ready to start");
109
			throw new MSROException("Workflow " + profileId + " is not ready to start");
110
		}
111
		if (pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
112
			log.warn("Wf " + profileId + " not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
113
			throw new MSROException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
114
		}
115

    
116
		final File tmpFile = File.createTempFile("wftfs", null);
117
		try {
118
			final Graph graph = graphLoader.loadGraph(desc.getWorkflowXml());
119
			final GraphProcess process = new MemGraphProcess(graph);
120
			final String procId = graphProcessRegistry.registerProcess(process);
121

    
122
			graphProcessRegistry.associateProcessWithResource(process, profileId);
123

    
124
			process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROCESS_ID, procId);
125
			process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profileId);
126
			process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_NAME, desc.getName());
127
			process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_NAME, graph.getName());
128
			process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_FAMILY, desc.getType());
129
			process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY, desc.getPriority());
130

    
131
			if (params != null) {
132
				for (Map.Entry<String, Object> e : params.entrySet()) {
133
					process.getEnv().setAttribute(e.getKey(), e.getValue());
134
				}
135
			}
136

    
137
			log.info("Process " + process + " in queue, priority=" + desc.getPriority());
138
			pendingProcs.put(process);
139

    
140
			return procId;
141
		} catch (Exception e) {
142
			log.error("Error parsing workflow xml: " + desc.getWorkflowXml(), e);
143
			throw new IllegalArgumentException("Error parsing workflow");
144
		} finally {
145
			tmpFile.delete();
146
		}
147
	}
148

    
149
	public void startMetaWorkflow(final String id, final boolean manual) throws Exception {
150
		final String query = "/*[.//RESOURCE_IDENTIFIER/@value='" + id + "']//CONFIGURATION[@status='EXECUTABLE']/WORKFLOW/@id/string()";
151

    
152
		final ISLookUpService lookup = serviceLocator.getService(ISLookUpService.class);
153

    
154
		final List<String> list = lookup.quickSearchProfile(query);
155

    
156
		if (list == null || list.isEmpty()) { throw new MSROException("Metaworkflow " + id + " not launched"); }
157

    
158
		for (String wfId : list) {
159
			final String q = "/*[.//RESOURCE_IDENTIFIER/@value='" + wfId + "']//CONFIGURATION/@start/string()";
160
			if (manual || lookup.getResourceProfileByQuery(q).equals("auto")) {
161
				startProcess(wfId);
162
			} else {
163
				log.warn("Worflow " + wfId + " can not be launched AUTOMATICALLY");
164
			}
165
		}
166
	}
167

    
168
	public GraphLoader getGraphLoader() {
169
		return graphLoader;
170
	}
171

    
172
	@Required
173
	public void setGraphLoader(final GraphLoader graphLoader) {
174
		this.graphLoader = graphLoader;
175
	}
176

    
177
	public MemEngine getEngine() {
178
		return engine;
179
	}
180

    
181
	@Required
182
	public void setEngine(final MemEngine engine) {
183
		this.engine = engine;
184
	}
185

    
186
	public GraphProcessRegistry getGraphProcessRegistry() {
187
		return graphProcessRegistry;
188
	}
189

    
190
	@Required
191
	public void setGraphProcessRegistry(final GraphProcessRegistry graphProcessRegistry) {
192
		this.graphProcessRegistry = graphProcessRegistry;
193
	}
194

    
195
	public ProfileToSarasvatiConverter getProfileToSarasvatiConverter() {
196
		return profileToSarasvatiConverter;
197
	}
198

    
199
	@Required
200
	public void setProfileToSarasvatiConverter(final ProfileToSarasvatiConverter profileToSarasvatiConverter) {
201
		this.profileToSarasvatiConverter = profileToSarasvatiConverter;
202
	}
203

    
204
	@Override
205
	public void stop() {
206
		this.paused = true;
207
	}
208

    
209
	@Override
210
	public void resume() {
211
		this.paused = false;
212
	}
213

    
214
	@Override
215
	public StoppableDetails getStopDetails() {
216
		final int count = graphProcessRegistry.countRunningWfs();
217

    
218
		StoppableDetails.StopStatus status = StopStatus.RUNNING;
219
		if (isPaused()) {
220
			if (count == 0) {
221
				status = StopStatus.STOPPED;
222
			} else {
223
				status = StopStatus.STOPPING;
224
			}
225
		}
226
		graphProcessRegistry.listIdentifiers();
227

    
228
		return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
229
	}
230

    
231
}
(8-8/8)