Project

General

Profile

1 39984 michele.ar
package eu.dnetlib.msro.workflows.nodes;
2
3 41530 michele.ar
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5
import org.springframework.beans.factory.annotation.Autowired;
6
7 40094 michele.ar
import eu.dnetlib.msro.workflows.graph.Arc;
8 44404 michele.ar
import eu.dnetlib.msro.workflows.procs.ProcessAware;
9 42206 michele.ar
import eu.dnetlib.msro.workflows.procs.ProcessRegistry;
10 40123 michele.ar
import eu.dnetlib.msro.workflows.procs.Token;
11 40099 michele.ar
import eu.dnetlib.msro.workflows.procs.WorkflowExecutor;
12 44404 michele.ar
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
13 39984 michele.ar
import eu.dnetlib.msro.workflows.util.ProcessCallback;
14 42206 michele.ar
import eu.dnetlib.msro.workflows.util.SubWorkflowProgressProvider;
15 39984 michele.ar
16
/**
17
 * Created by michele on 18/11/15.
18
 */
19 44404 michele.ar
public class LaunchWorkflowJobNode extends ProcessNode implements ProcessAware {
20 39984 michele.ar
21
	private static final Log log = LogFactory.getLog(LaunchWorkflowJobNode.class);
22
	private String wfId;
23 39992 michele.ar
24 39984 michele.ar
	@Autowired
25
	private WorkflowExecutor executor;
26
27 42206 michele.ar
	@Autowired
28
	private ProcessRegistry processRegistry;
29
30 44404 michele.ar
	private WorkflowProcess process;
31
32 39984 michele.ar
	@Override
33 40094 michele.ar
	public final void execute(final Token token) {
34 39984 michele.ar
35
		try {
36 44404 michele.ar
			final String procId = executor.startWorkflow(getWfId(), new ProcessCallback() {
37 39984 michele.ar
38
				@Override
39
				public void onSuccess() {
40 39992 michele.ar
					log.debug("Child workflow has been completed successfully");
41 40094 michele.ar
					token.setNextArc(Arc.DEFAULT_ARC);
42
					token.release();
43 39984 michele.ar
				}
44
45
				@Override
46
				public void onFail() {
47
					log.error("Child workflow is failed");
48 40115 michele.ar
					token.releaseAsFailed("Child workflow is failed");
49 39984 michele.ar
				}
50 44404 michele.ar
			}, process.getProfileId());
51
52 39992 michele.ar
			if (log.isDebugEnabled()) {
53
				log.debug("The child workflow [" + getWfId() + "] is starting with procId: " + procId);
54
			}
55 42206 michele.ar
56 44404 michele.ar
			token.setProgressProvider(new SubWorkflowProgressProvider(procId, processRegistry));
57 42206 michele.ar
58 41530 michele.ar
		} catch (final Throwable e) {
59 39984 michele.ar
			log.error("got exception while launching child workflow", e);
60 40115 michele.ar
			token.releaseAsFailed(e);
61 39984 michele.ar
		}
62
	}
63
64
	public String getWfId() {
65 44404 michele.ar
		return wfId;
66 39984 michele.ar
	}
67
68
	public void setWfId(final String wfId) {
69
		this.wfId = wfId;
70
	}
71
72 44404 michele.ar
	@Override
73
	public void setProcess(final WorkflowProcess process) {
74
		this.process = process;
75
	}
76
77 39984 michele.ar
}