Project

General

Profile

1
package eu.dnetlib.msro.workflows.sarasvati.loader;
2

    
3
import java.io.File;
4
import java.util.Comparator;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.PriorityBlockingQueue;
9
import java.util.concurrent.ScheduledExecutorService;
10
import java.util.concurrent.TimeUnit;
11

    
12
import javax.annotation.Resource;
13

    
14
import org.apache.commons.lang.math.NumberUtils;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.springframework.beans.factory.annotation.Required;
18

    
19
import com.googlecode.sarasvati.Graph;
20
import com.googlecode.sarasvati.GraphProcess;
21
import com.googlecode.sarasvati.mem.MemEngine;
22
import com.googlecode.sarasvati.mem.MemGraphProcess;
23

    
24
import eu.dnetlib.enabling.common.Stoppable;
25
import eu.dnetlib.enabling.common.StoppableDetails;
26
import eu.dnetlib.enabling.common.StoppableDetails.StopStatus;
27
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
28
import eu.dnetlib.enabling.tools.ServiceLocator;
29
import eu.dnetlib.miscutils.datetime.DateUtils;
30
import eu.dnetlib.msro.rmi.MSROException;
31
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
32
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
33

    
34
public class WorkflowExecutor implements Stoppable {
35
	private MemEngine engine;
36
	private GraphLoader graphLoader;
37
	private GraphProcessRegistry graphProcessRegistry;
38
	private ProfileToSarasvatiConverter profileToSarasvatiConverter;
39
	private ScheduledExecutorService queueConsumers;
40
	
41
	private boolean paused = false;
42
	
43
	@Resource(name="lookupLocator")
44
	private ServiceLocator<ISLookUpService> lookupLocator;
45
	
46
	public boolean isPaused() {
47
		return paused;
48
	}
49

    
50
	public void setPaused(boolean paused) {
51
		this.paused = paused;
52
	}
53

    
54
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
55
		
56
	private PriorityBlockingQueue<GraphProcess> pendingProcs = new PriorityBlockingQueue<GraphProcess>(20, new Comparator<GraphProcess>() {
57
		
58
		@Override
59
		public int compare(GraphProcess p1, GraphProcess p2) {
60
			int n1 = NumberUtils.toInt(p1.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY), WorkflowsConstants.DEFAULT_WF_PRIORITY);
61
			int n2 = NumberUtils.toInt(p2.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY), WorkflowsConstants.DEFAULT_WF_PRIORITY);
62
			return NumberUtils.compare(n1, n2);
63
		}
64
	});
65
		
66
	public void init() {
67
		this.queueConsumers = Executors.newScheduledThreadPool(WorkflowsConstants.MAX_WF_THREADS);
68
		final int period = 60;
69
		final int step   = period / WorkflowsConstants.MAX_WF_THREADS;
70
		
71
		for (int i=0; i<WorkflowsConstants.MAX_WF_THREADS; i++) {
72
			this.queueConsumers.scheduleAtFixedRate(new Runnable() {
73
				@Override
74
				public void run() {
75
					if (isPaused()) {
76
						return;
77
					}
78
						
79
					final GraphProcess process = pendingProcs.poll();
80
					if (process != null) {
81
						log.info("Starting workflow: " + process);
82
						final long now = DateUtils.now();
83
						process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_START_DATE, now);
84
						process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_START_HUMAN_DATE, DateUtils.calculate_ISO8601(now));
85
						engine.startProcess(process);
86
					} else {
87
						log.debug("Process queue is empty");
88
					}
89
				}
90
			}, i * step, period, TimeUnit.SECONDS);
91
		}
92
	}
93
	
94
	public String startProcess(final String profileId) throws Exception {
95
		return startProcess(profileId, null);
96
	}
97
	
98
	public String startProcess(final String profileId, final Map<String, Object> params) throws Exception {
99
		final WfProfileDescriptor desc = profileToSarasvatiConverter.getSarasvatiWorkflow(profileId);
100
		
101
		if (isPaused()) {
102
			log.warn("Wf " + profileId + " not launched, because WorkflowExecutor is preparing for shutdown");
103
			throw new MSROException("WorkflowExecutor is preparing for shutdown");
104
		}
105
		if (!desc.isReady()) {
106
			log.warn("Wf " + profileId + " not launched, because it is not ready to start");
107
			throw new MSROException("Workflow " + profileId + " is not ready to start");
108
		}
109
		if (pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
110
			log.warn("Wf " + profileId + " not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
111
        	throw new MSROException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
112
        }
113
		
114
		final File tmpFile = File.createTempFile("wftfs", null);
115
		try {
116
			final Graph graph = graphLoader.loadGraph(desc.getWorkflowXml());
117
			final GraphProcess process = new MemGraphProcess(graph);
118
	        final String procId = graphProcessRegistry.registerProcess(process);
119
	        
120
	        graphProcessRegistry.associateProcessWithResource(process, profileId);
121
	        
122
	        process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROCESS_ID, procId);
123
	        process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profileId);
124
	        process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_NAME, desc.getName());
125
	        process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_NAME, graph.getName());
126
	        process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_FAMILY, desc.getType());
127
	        process.getEnv().setAttribute(WorkflowsConstants.SYSTEM_WF_PRIORITY, desc.getPriority());
128
	        
129
	        if (params != null) {
130
	        	for (Map.Entry<String, Object> e : params.entrySet()) {
131
	        		process.getEnv().setAttribute(e.getKey(), e.getValue());
132
	        	} 
133
	        }
134
	        	        
135
	        log.info("Process " + process + " in queue, priority=" + desc.getPriority());
136
	        pendingProcs.put(process);
137
			
138
			return procId;
139
		} catch (Exception e) {
140
			log.error("Error parsing workflow xml: " + desc.getWorkflowXml(), e);
141
			throw new IllegalArgumentException("Error parsing workflow");
142
		} finally {
143
			tmpFile.delete();
144
		}
145
	}
146
	
147
	public void startMetaWorkflow(final String id, final boolean manual) throws Exception {
148
		final String query = "/*[.//RESOURCE_IDENTIFIER/@value='" + id + "']//CONFIGURATION[@status='EXECUTABLE']/WORKFLOW/@id/string()";
149
		
150
		final List<String> list = lookupLocator.getService().quickSearchProfile(query);	
151
		
152
		if (list == null || list.isEmpty()) {
153
			throw new MSROException("Metaworkflow " + id + " not launched");
154
		}
155

    
156
		for (String wfId : list) {
157
			final String q = "/*[.//RESOURCE_IDENTIFIER/@value='" + wfId + "']//CONFIGURATION/@start/string()";
158
			if (manual || lookupLocator.getService().getResourceProfileByQuery(q).equals("auto")) {
159
				startProcess(wfId);
160
			} else {
161
				log.warn("Worflow " + wfId + " can not be launched AUTOMATICALLY");
162
			}
163
		}
164
	}
165
	
166
	public GraphLoader getGraphLoader() {
167
		return graphLoader;
168
	}
169

    
170
	@Required
171
	public void setGraphLoader(GraphLoader graphLoader) {
172
		this.graphLoader = graphLoader;
173
	}
174

    
175
	public MemEngine getEngine() {
176
		return engine;
177
	}
178
	
179
	@Required
180
	public void setEngine(MemEngine engine) {
181
		this.engine = engine;
182
	}
183

    
184
	public GraphProcessRegistry getGraphProcessRegistry() {
185
		return graphProcessRegistry;
186
	}
187

    
188
	@Required
189
	public void setGraphProcessRegistry(GraphProcessRegistry graphProcessRegistry) {
190
		this.graphProcessRegistry = graphProcessRegistry;
191
	}
192

    
193
	public ProfileToSarasvatiConverter getProfileToSarasvatiConverter() {
194
		return profileToSarasvatiConverter;
195
	}
196

    
197
	@Required
198
	public void setProfileToSarasvatiConverter(ProfileToSarasvatiConverter profileToSarasvatiConverter) {
199
		this.profileToSarasvatiConverter = profileToSarasvatiConverter;
200
	}
201

    
202
	@Override
203
	public void stop() {
204
		this.paused = true;
205
	}
206

    
207
	@Override
208
	public void resume() {
209
		this.paused = false;
210
	}
211

    
212
	@Override
213
	public StoppableDetails getStopDetails() {
214
		final int count = graphProcessRegistry.countRunningWfs();
215
		
216
		StoppableDetails.StopStatus status = StopStatus.RUNNING;
217
		if (isPaused()) {
218
			if (count == 0) {
219
				status = StopStatus.STOPPED;
220
			} else {
221
				status = StopStatus.STOPPING;
222
			}
223
		}
224
		graphProcessRegistry.listIdentifiers();
225
		
226
		return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
227
	}
228

    
229

    
230
}
(8-8/8)