Project

General

Profile

1
package eu.dnetlib.functionality.modular.ui.workflows.controllers;
2

    
3
import java.awt.image.BufferedImage;
4
import java.io.IOException;
5
import java.io.OutputStream;
6
import java.io.StringReader;
7
import java.util.Collection;
8
import java.util.Collections;
9
import java.util.Iterator;
10
import java.util.List;
11
import java.util.Map;
12
import java.util.Set;
13

    
14
import javax.annotation.Resource;
15
import javax.imageio.ImageIO;
16
import javax.servlet.http.HttpServletResponse;
17

    
18
import org.apache.commons.lang.math.NumberUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.dom4j.Document;
22
import org.dom4j.Element;
23
import org.dom4j.io.SAXReader;
24
import org.joda.time.DateTime;
25
import org.joda.time.format.DateTimeFormat;
26
import org.joda.time.format.DateTimeFormatter;
27
import org.springframework.stereotype.Controller;
28
import org.springframework.web.bind.annotation.RequestMapping;
29
import org.springframework.web.bind.annotation.RequestParam;
30
import org.springframework.web.bind.annotation.ResponseBody;
31

    
32
import com.google.common.base.Function;
33
import com.google.common.collect.Iterators;
34
import com.google.common.collect.Lists;
35
import com.google.common.collect.Maps;
36
import com.google.gson.Gson;
37
import com.google.gson.reflect.TypeToken;
38
import com.googlecode.sarasvati.GraphProcess;
39
import com.googlecode.sarasvati.Node;
40
import com.googlecode.sarasvati.NodeToken;
41
import com.googlecode.sarasvati.ProcessState;
42

    
43
import eu.dnetlib.common.logging.DnetLogger;
44
import eu.dnetlib.common.logging.LogMessage;
45
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
46
import eu.dnetlib.functionality.modular.ui.AbstractAjaxController;
47
import eu.dnetlib.functionality.modular.ui.workflows.objects.AdvancedMetaWorkflowDescriptor;
48
import eu.dnetlib.functionality.modular.ui.workflows.objects.AtomicWorkflowDescriptor;
49
import eu.dnetlib.functionality.modular.ui.workflows.objects.MetaWorkflowDescriptor;
50
import eu.dnetlib.functionality.modular.ui.workflows.objects.NodeInfo;
51
import eu.dnetlib.functionality.modular.ui.workflows.objects.NodeTokenInfo;
52
import eu.dnetlib.functionality.modular.ui.workflows.objects.NodeWithUserParams;
53
import eu.dnetlib.functionality.modular.ui.workflows.objects.ProcessListEntry;
54
import eu.dnetlib.functionality.modular.ui.workflows.objects.sections.WorkflowSectionGrouper;
55
import eu.dnetlib.functionality.modular.ui.workflows.sarasvati.viewer.ProcessGraphGenerator;
56
import eu.dnetlib.functionality.modular.ui.workflows.util.ISLookupClient;
57
import eu.dnetlib.functionality.modular.ui.workflows.util.ISRegistryClient;
58
import eu.dnetlib.miscutils.datetime.DateUtils;
59
import eu.dnetlib.msro.workflows.sarasvati.loader.ProfileToSarasvatiConverter;
60
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
61
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
62
import eu.dnetlib.msro.workflows.util.ProcessUtils;
63
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
64
import eu.dnetlib.msro.workflows.util.WorkflowsConstants.WorkflowStatus;
65

    
66
/**
67
 * Web controller for the UI
68
 *
69
 * @author Michele Artini
70
 */
71

    
72
@Controller
73
public class WorkflowsController extends AbstractAjaxController {
74

    
75
	private final class JournalEntryFunction implements Function<Map<String, String>, ProcessListEntry> {
76

    
77
		@Override
78
		public ProcessListEntry apply(final Map<String, String> input) {
79
			final String name = input.get(WorkflowsConstants.SYSTEM_WF_PROFILE_NAME);
80

    
81
			final String repo = input.containsKey(WorkflowsConstants.DATAPROVIDER_NAME) ? input.get(WorkflowsConstants.DATAPROVIDER_NAME) : "";
82
			final String repoId = input.containsKey(WorkflowsConstants.DATAPROVIDER_ORIGINALID) ? input.get(WorkflowsConstants.DATAPROVIDER_ORIGINALID) : "";
83
			final String apiId = input.containsKey(WorkflowsConstants.DATAPROVIDER_INTERFACE) ? input.get(WorkflowsConstants.DATAPROVIDER_INTERFACE) : "";
84

    
85
			final String procId = input.get(WorkflowsConstants.SYSTEM_WF_PROCESS_ID);
86
			final String wfId = input.get(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
87
			final String family = input.get(WorkflowsConstants.SYSTEM_WF_PROFILE_FAMILY);
88
			final long date = NumberUtils.toLong(input.get(LogMessage.LOG_DATE_FIELD), 0);
89
			final String status = Boolean.valueOf(input.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY)) ? "SUCCESS" : "FAILURE";
90

    
91
			return new ProcessListEntry(procId, wfId, name, family, status, date, repo, repoId, apiId);
92
		}
93
	}
94

    
95
	@Resource
96
	private ISLookupClient isLookupClient;
97

    
98
	@Resource
99
	private ISRegistryClient isRegistryClient;
100

    
101
	@Resource
102
	private GraphProcessRegistry graphProcessRegistry;
103

    
104
	@Resource
105
	private ProcessGraphGenerator processGraphGenerator;
106

    
107
	@Resource
108
	private WorkflowSectionGrouper workflowSectionGrouper;
109

    
110
	@Resource
111
	private WorkflowExecutor workflowExecutor;
112

    
113
	@Resource
114
	private ProfileToSarasvatiConverter profileToSarasvatiConverter;
115

    
116
	@Resource(name = "msroWorkflowLogger")
117
	private DnetLogger dnetLogger;
118

    
119
	private static final Log log = LogFactory.getLog(WorkflowsController.class);
120

    
121
	@RequestMapping("/ui/list_metaworkflows.json")
122
	public @ResponseBody List<MetaWorkflowDescriptor> listMetaWorflowsForSection(@RequestParam(value = "section", required = false) final String sectionName,
123
			@RequestParam(value = "dsId", required = false) final String dsId)
124
			throws ISLookUpException, IOException {
125
		if (sectionName != null) {
126
			return workflowSectionGrouper.listMetaWorflowsForSection(sectionName);
127
		} else if (dsId != null) {
128
			return workflowSectionGrouper.listMetaWorflowsForDatasource(dsId);
129
		} else {
130
			return Lists.newArrayList();
131
		}
132
	}
133

    
134
	@RequestMapping("/ui/wf_metaworkflow.json")
135
	public @ResponseBody AdvancedMetaWorkflowDescriptor getMetaWorkflow(@RequestParam(value = "id", required = true) final String id) throws Exception {
136
		return isLookupClient.getMetaWorkflow(id);
137
	}
138

    
139
	@RequestMapping("/ui/wf_atomic_workflow.json")
140
	public @ResponseBody AtomicWorkflowDescriptor getAtomicWorkflow(@RequestParam(value = "id", required = true) final String id) throws Exception {
141
		final AtomicWorkflowDescriptor wf = isLookupClient.getAtomicWorkflow(id);
142
		final String xml = profileToSarasvatiConverter.getSarasvatiWorkflow(id).getWorkflowXml();
143

    
144
		wf.setMapContent(processGraphGenerator.getWfDescImageMap(id, xml));
145

    
146
		return wf;
147
	}
148

    
149
	@RequestMapping("/ui/wf_atomic_workflow.img")
150
	public void showAtomicWorkflow(final HttpServletResponse response, @RequestParam(value = "id", required = true) final String id) throws Exception {
151

    
152
		final String xml = profileToSarasvatiConverter.getSarasvatiWorkflow(id).getWorkflowXml();
153
		final Set<String> notConfiguredNodes = isLookupClient.getNotConfiguredNodes(id);
154
		final BufferedImage image = processGraphGenerator.getWfDescImage(id, xml, notConfiguredNodes);
155
		sendImage(response, image);
156
	}
157

    
158
	private void sendImage(final HttpServletResponse response, final BufferedImage image) throws IOException {
159
		response.setContentType("image/png");
160
		final OutputStream out = response.getOutputStream();
161
		ImageIO.write(image, "png", out);
162
		out.flush();
163
		out.close();
164
	}
165

    
166
	@RequestMapping("/ui/wf.start")
167
	public @ResponseBody String startWorkflow(@RequestParam(value = "id", required = true) final String id) throws Exception {
168
		return workflowExecutor.startProcess(id);
169
	}
170

    
171
	@RequestMapping("/ui/metawf.start")
172
	public @ResponseBody String startMetaWorkflow(@RequestParam(value = "id", required = true) final String id) throws Exception {
173
		workflowExecutor.startMetaWorkflow(id, true);
174
		return id;
175
	}
176

    
177
	@RequestMapping("/ui/wf_workflow_node.json")
178
	public @ResponseBody NodeInfo workflowNode_info(@RequestParam(value = "wf", required = true) final String wfId,
179
			@RequestParam(value = "node", required = true) final String nodeName) throws ISLookUpException, IOException {
180
		return isLookupClient.getNodeInfo(wfId, nodeName);
181
	}
182

    
183
	@RequestMapping("/ui/wf_metaworkflow.edit")
184
	public @ResponseBody boolean scheduleMetaWorkflow(@RequestParam(value = "json", required = true) final String json) throws Exception {
185

    
186
		final AdvancedMetaWorkflowDescriptor info = new Gson().fromJson(json, AdvancedMetaWorkflowDescriptor.class);
187

    
188
		log.info("Updating workflow " + info.getName());
189

    
190
		final String xml = isLookupClient.getProfile(info.getWfId());
191
		final boolean res = isRegistryClient.updateSarasvatiMetaWorkflow(info.getWfId(), xml, info);
192

    
193
		return res;
194
	}
195

    
196
	@RequestMapping("/ui/clone_metaworkflow.do")
197
	public @ResponseBody String cloneMetaWf(@RequestParam(value = "id", required = true) final String id,
198
			@RequestParam(value = "name", required = true) final String name) throws Exception {
199

    
200
		if (name.trim().length() > 0) {
201
			final String xml = isLookupClient.getProfile(id);
202
			final SAXReader reader = new SAXReader();
203
			final Document doc = reader.read(new StringReader(xml));
204
			doc.selectSingleNode("//METAWORKFLOW_NAME").setText(name);
205
			for (final Object o : doc.selectNodes("//WORKFLOW")) {
206
				final Element n = (Element) o;
207
				final String atomWfXml = isLookupClient.getProfile(n.valueOf("@id"));
208
				final String newAtomWfId = isRegistryClient.registerProfile(atomWfXml);
209
				n.addAttribute("id", newAtomWfId);
210
			}
211
			return isRegistryClient.registerProfile(doc.asXML());
212
		} else {
213
			throw new IllegalArgumentException("Name is empty");
214
		}
215
	}
216

    
217
	@RequestMapping("/ui/wf_proc_node.json")
218
	public @ResponseBody NodeTokenInfo getProcessWorkflowNode(@RequestParam(value = "id", required = true) final String pid,
219
			@RequestParam(value = "node", required = true) final long nid) throws Exception {
220

    
221
		final NodeToken token = findNodeToken(pid, nid);
222

    
223
		final NodeTokenInfo info = token == null ? new NodeTokenInfo(findNodeName(pid, nid)) : new NodeTokenInfo(token);
224

    
225
		return info;
226
	}
227

    
228
	private NodeToken findNodeToken(final String pid, final long nid) {
229
		final GraphProcess process = graphProcessRegistry.findProcess(pid);
230
		if (process != null) {
231
			for (final NodeToken token : process.getNodeTokens()) {
232
				if (token.getNode().getId() == nid) { return token; }
233
			}
234
		}
235
		return null;
236
	}
237

    
238
	private String findNodeName(final String pid, final long nid) {
239
		final GraphProcess process = graphProcessRegistry.findProcess(pid);
240
		if (process != null) {
241
			for (final Node node : process.getGraph().getNodes()) {
242
				if (node.getId() == nid) { return node.getName(); }
243
			}
244
		}
245
		return "-";
246
	}
247

    
248
	@RequestMapping("/ui/wf_proc.img")
249
	public void showProcessWorkflow(final HttpServletResponse response, @RequestParam(value = "id", required = true) final String id) throws Exception {
250
		final BufferedImage image = processGraphGenerator.getProcessImage(id);
251
		sendImage(response, image);
252
	}
253

    
254
	@RequestMapping("/ui/wf_proc.kill")
255
	public @ResponseBody boolean killProcessWorkflow(@RequestParam(value = "id", required = true) final String id) throws Exception {
256
		final GraphProcess proc = graphProcessRegistry.findProcess(id);
257
		proc.setState(ProcessState.Canceled);
258
		return true;
259
	}
260

    
261
	@RequestMapping("/ui/wf_journal.range")
262
	public @ResponseBody Collection<ProcessListEntry> rangeWfJournal(@RequestParam(value = "start", required = true) final String start,
263
			@RequestParam(value = "end", required = true) final String end) throws Exception {
264

    
265
		final Map<String, ProcessListEntry> res = Maps.newHashMap();
266

    
267
		final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd");
268
		final DateTime startDate = formatter.parseDateTime(start);
269
		final DateTime endDate = formatter.parseDateTime(end).plusHours(23).plusMinutes(59).plusSeconds(59);
270

    
271
		final Iterator<ProcessListEntry> iter = Iterators.transform(dnetLogger.range(startDate.toDate(), endDate.toDate()), new JournalEntryFunction());
272
		while (iter.hasNext()) {
273
			final ProcessListEntry e = iter.next();
274
			res.put(e.getProcId(), e);
275
		}
276

    
277
		final long now = DateUtils.now();
278
		if (startDate.isBefore(now) && endDate.isAfter(now)) {
279
			for (final String pid : graphProcessRegistry.listIdentifiers()) {
280
				final GraphProcess proc = graphProcessRegistry.findProcess(pid);
281
				res.put(pid, new ProcessListEntry(pid, proc));
282
			}
283
		}
284

    
285
		return res.values();
286

    
287
	}
288

    
289
	@RequestMapping("/ui/wf_journal.find")
290
	public @ResponseBody Collection<ProcessListEntry> findWfJournal(@RequestParam(value = "wfs", required = true) final String wfs) {
291
		final Map<String, ProcessListEntry> res = Maps.newHashMap();
292

    
293
		final Set<String> wfFilter = new Gson().fromJson(wfs, new TypeToken<Set<String>>() {}.getType());
294

    
295
		for (final String wfId : wfFilter) {
296
			final Iterator<ProcessListEntry> iter =
297
					Iterators.transform(dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, wfId), new JournalEntryFunction());
298
			while (iter.hasNext()) {
299
				final ProcessListEntry e = iter.next();
300
				res.put(e.getProcId(), e);
301
			}
302
		}
303

    
304
		for (final String pid : graphProcessRegistry.listIdentifiers()) {
305
			final GraphProcess proc = graphProcessRegistry.findProcess(pid);
306
			if (wfFilter.contains(ProcessUtils.calculateWfId(proc))) {
307
				res.put(pid, new ProcessListEntry(pid, proc));
308
			}
309
		}
310

    
311
		return res.values();
312
	}
313

    
314
	@RequestMapping("/ui/wf_journal_byFamily.find")
315
	public @ResponseBody Collection<ProcessListEntry> findWfJournalByFamily(@RequestParam(value = "family", required = true) final String family)
316
			throws IOException {
317
		final Iterator<ProcessListEntry> iter =
318
				Iterators.transform(dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_FAMILY, family), new JournalEntryFunction());
319
		return Lists.newArrayList(iter);
320
	}
321

    
322
	@RequestMapping("/ui/wf_journal.get")
323
	public @ResponseBody Map<String, Object> getWfJournalLog(@RequestParam(value = "id", required = true) final String id) throws Exception {
324
		final Map<String, Object> res = Maps.newHashMap();
325

    
326
		final Map<String, String> logs = dnetLogger.findOne("system:processId", id);
327

    
328
		if (logs != null && !logs.isEmpty()) {
329
			final List<String> keys = Lists.newArrayList(logs.keySet());
330
			Collections.sort(keys);
331

    
332
			final List<Map<String, String>> journalEntry = Lists.newArrayList();
333
			for (final String k : keys) {
334
				final Map<String, String> m = Maps.newHashMap();
335
				m.put("name", k);
336
				m.put("value", logs.get(k));
337
				journalEntry.add(m);
338
			}
339
			res.put("journal", journalEntry);
340
		}
341

    
342
		final GraphProcess process = graphProcessRegistry.findProcess(id);
343

    
344
		if (process != null) {
345
			final String mapContent = process.getState() == ProcessState.Created ? "" : processGraphGenerator.getProcessImageMap(id);
346

    
347
			String status = "";
348
			if (!process.isComplete()) {
349
				status = process.getState().toString().toUpperCase();
350
			} else if ("true".equals(process.getEnv().getAttribute(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
351
				status = "SUCCESS";
352
			} else {
353
				status = "FAILURE";
354
			}
355

    
356
			final String img =
357
					process.getState() == ProcessState.Created ? "../resources/img/notStarted.gif" : "wf_proc.img?id=" + id + "&t=" + DateUtils.now();
358

    
359
			final String name = process.getGraph().getName();
360

    
361
			final long startDate = NumberUtils.toLong(process.getEnv().getAttribute(WorkflowsConstants.SYSTEM_START_DATE), 0);
362
			final long endDate = NumberUtils.toLong(process.getEnv().getAttribute(WorkflowsConstants.SYSTEM_END_DATE), 0);
363

    
364
			final AtomicWorkflowDescriptor wf = new AtomicWorkflowDescriptor(id, name, status, mapContent, img, true, "auto", "RUNNING", startDate, endDate);
365

    
366
			res.put("graph", wf);
367
		}
368

    
369
		return res;
370
	}
371

    
372
	@RequestMapping("/ui/wf_atomic_workflow.enable")
373
	public @ResponseBody String enableAtomicWf(@RequestParam(value = "id", required = true) final String id,
374
			@RequestParam(value = "start", required = true) final String value) throws Exception {
375
		isRegistryClient.configureWorkflowStart(id, value);
376

    
377
		return value;
378
	}
379

    
380
	@RequestMapping("/ui/workflow_user_params.json")
381
	public @ResponseBody List<NodeWithUserParams> listWorkflowUserParams(@RequestParam(value = "wf", required = true) final String wfId) throws Exception {
382
		return isLookupClient.listWorkflowUserParams(wfId);
383
	}
384

    
385
	@RequestMapping(value = "/ui/save_user_params.do")
386
	public @ResponseBody boolean saveWorkflowUserParams(@RequestParam(value = "wf", required = true) final String wfId,
387
			@RequestParam(value = "params", required = true) final String jsonParams) throws Exception {
388

    
389
		final String xml = isLookupClient.getProfile(wfId);
390

    
391
		final List<NodeWithUserParams> params = new Gson().fromJson(jsonParams, new TypeToken<List<NodeWithUserParams>>() {}.getType());
392

    
393
		final boolean res = isRegistryClient.updateSarasvatiWorkflow(wfId, xml, params);
394

    
395
		for (final String metaWfId : isLookupClient.listMetaWorflowsForWfId(wfId)) {
396
			if (isLookupClient.isExecutable(metaWfId)) {
397
				isRegistryClient.updateMetaWorkflowStatus(metaWfId, WorkflowStatus.EXECUTABLE);
398
			} else {
399
				isRegistryClient.updateMetaWorkflowStatus(metaWfId, WorkflowStatus.WAIT_USER_SETTINGS);
400
			}
401
		}
402

    
403
		return res;
404
	}
405

    
406
}
(5-5/5)