Project

General

Profile

1
package eu.dnetlib.msro.worker;
2

    
3
import java.io.File;
4
import java.io.FileOutputStream;
5
import java.io.StringReader;
6
import java.util.Comparator;
7
import java.util.Map;
8
import java.util.concurrent.PriorityBlockingQueue;
9

    
10
import org.apache.commons.io.IOUtils;
11
import org.apache.commons.lang.math.NumberUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.DocumentException;
16
import org.dom4j.io.SAXReader;
17
import org.springframework.beans.factory.annotation.Required;
18

    
19
import com.googlecode.sarasvati.Graph;
20

    
21
import eu.dnetlib.common.ifaces.BlackboardExecutionCallback;
22
import eu.dnetlib.common.ifaces.Stoppable;
23
import eu.dnetlib.common.ifaces.StoppableDetails;
24
import eu.dnetlib.common.ifaces.StoppableDetails.StopStatus;
25
import eu.dnetlib.miscutils.DateUtils;
26
import eu.dnetlib.msro.worker.sarasvati.DnetGraphProcess;
27
import eu.dnetlib.msro.worker.sarasvati.InjectableMemEngine;
28
import eu.dnetlib.rmi.blackboard.LaunchWorkflowMessage;
29
import eu.dnetlib.rmi.soap.exceptions.ManagerServiceException;
30

    
31
public class WorkflowLauncher implements Stoppable {
32

    
33
	private InjectableMemEngine engine;
34
	private LocalWorkflowRegistry workflowRegistry;
35

    
36
	private boolean paused = false;
37

    
38
	private static final Log log = LogFactory.getLog(WorkflowLauncher.class);
39

    
40
	private PriorityBlockingQueue<DnetGraphProcess> pendingProcs = new PriorityBlockingQueue<DnetGraphProcess>(20, new Comparator<DnetGraphProcess>() {
41

    
42
		@Override
43
		public int compare(final DnetGraphProcess p1, final DnetGraphProcess p2) {
44
			return NumberUtils.compare(p1.getPriority(), p2.getPriority());
45
		}
46
	});
47

    
48
	public void startsWfsInQueue() {
49
		if (pendingProcs.isEmpty() || isPaused()) { return; }
50

    
51
		int k = WorkflowConstants.MAX_WF_THREADS - workflowRegistry.countRunningWfs();
52

    
53
		for (int i = 0; i < k && !pendingProcs.isEmpty(); i++) {
54
			final DnetGraphProcess process = pendingProcs.poll();
55
			if (process != null) {
56
				log.info("Starting workflow: " + process);
57
				final long now = DateUtils.now();
58
				process.getEnv().setAttribute(WorkflowConstants.SYSTEM_START_DATE, now);
59
				process.getEnv().setAttribute(WorkflowConstants.SYSTEM_START_HUMAN_DATE, DateUtils.getDate_ISO8601(now));
60
				engine.startProcess(process);
61
			} else {
62
				log.debug("Process queue is empty");
63
			}
64
		}
65
	}
66

    
67
	public void enqueueWf(final LaunchWorkflowMessage message,
68
			final Map<String, String> params,
69
			final BlackboardExecutionCallback<LaunchWorkflowMessage> callback)
70
			throws Exception {
71

    
72
		if (isPaused()) {
73
			log.warn("Wf not launched, because WorkflowExecutor is preparing for shutdown");
74
			throw new ManagerServiceException("WorkflowExecutor is preparing for shutdown");
75
		}
76

    
77
		if (pendingProcs.size() > WorkflowConstants.MAX_PENDING_PROCS_SIZE) {
78
			log.warn("Wf not launched, Max number of pending procs reached: " + WorkflowConstants.MAX_PENDING_PROCS_SIZE);
79
			throw new ManagerServiceException("Max number of pending procs reached: " + WorkflowConstants.MAX_PENDING_PROCS_SIZE);
80
		}
81

    
82
		final File tmpFile = File.createTempFile("wftfs", null);
83
		try {
84
			final Graph graph = loadGraph(message.getWfXml());
85
			final DnetGraphProcess process = new DnetGraphProcess(graph, message.getWfPriority(), message, callback);
86
			workflowRegistry.registerProcess(process);
87
			process.getEnv().setAttribute(WorkflowConstants.SYSTEM_WF_PROCESS_ID, message.getProcId());
88
			process.getEnv().setAttribute(WorkflowConstants.SYSTEM_WF_NAME, graph.getName());
89
			process.getEnv().setAttribute(WorkflowConstants.SYSTEM_WF_PRIORITY, message.getWfPriority());
90
			if (params != null) {
91
				for (Map.Entry<String, String> e : params.entrySet()) {
92
					process.getEnv().setAttribute(e.getKey(), e.getValue());
93
				}
94
			}
95

    
96
			log.info("Process " + process + " in queue, priority=" + message.getWfPriority());
97

    
98
			pendingProcs.put(process);
99
		} catch (Exception e) {
100
			log.error("Error parsing workflow xml: " + message.getWfXml(), e);
101
			throw new IllegalArgumentException("Error parsing workflow");
102
		} finally {
103
			tmpFile.delete();
104
		}
105
	}
106

    
107
	private Graph loadGraph(final String xml) throws Exception {
108
		final File tmpFile = File.createTempFile("wftfs", null);
109
		try {
110
			IOUtils.copy(new StringReader(xml), new FileOutputStream(tmpFile));
111

    
112
			final SAXReader reader = new SAXReader();
113
			final Document doc = reader.read(tmpFile);
114
			final String graphName = doc.valueOf("/process-definition/@name");
115

    
116
			getEngine().getLoader().load(tmpFile);
117

    
118
			final Graph graph = getEngine().getRepository().getLatestGraph(graphName);
119
			if (graph == null) { throw new IllegalArgumentException("graph called " + graphName + " doesn't exist"); }
120
			return graph;
121
		} catch (DocumentException e) {
122
			log.error("Error parsing xml: " + xml, e);
123
			throw e;
124
		} finally {
125
			tmpFile.delete();
126
		}
127
	}
128

    
129
	public boolean isPaused() {
130
		return paused;
131
	}
132

    
133
	public void setPaused(final boolean paused) {
134
		this.paused = paused;
135
	}
136

    
137
	@Override
138
	public StoppableDetails getStopDetails() {
139
		final int count = workflowRegistry.countRunningWfs();
140

    
141
		StoppableDetails.StopStatus status = StopStatus.RUNNING;
142
		if (isPaused()) {
143
			status = count == 0 ? StopStatus.STOPPED : StopStatus.STOPPING;
144
		}
145
		// graphProcessRegistry.listProcIds();
146

    
147
		return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
148
	}
149

    
150
	@Override
151
	public void stop() {
152
		this.paused = true;
153
	}
154

    
155
	@Override
156
	public void resume() {
157
		this.paused = false;
158
	}
159

    
160
	public InjectableMemEngine getEngine() {
161
		return engine;
162
	}
163

    
164
	@Required
165
	public void setEngine(final InjectableMemEngine engine) {
166
		this.engine = engine;
167
	}
168

    
169
	public LocalWorkflowRegistry getWorkflowRegistry() {
170
		return workflowRegistry;
171
	}
172

    
173
	@Required
174
	public void setWorkflowRegistry(final LocalWorkflowRegistry workflowRegistry) {
175
		this.workflowRegistry = workflowRegistry;
176
	}
177

    
178
}
(5-5/5)