Project

General

Profile

1 45395 michele.ar
package eu.dnetlib.msro.workflows.procs;
2
3
import java.util.ArrayList;
4
import java.util.Arrays;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.stream.Collectors;
9
10
import org.apache.commons.lang3.StringEscapeUtils;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.beans.factory.annotation.Autowired;
15
import org.springframework.stereotype.Component;
16
17
import com.google.common.base.Throwables;
18
19
import eu.dnetlib.miscutils.datetime.DateUtils;
20 46387 michele.ar
import eu.dnetlib.msro.workflows.GraphNode;
21 46679 michele.ar
import eu.dnetlib.msro.workflows.ProcessStatus;
22
import eu.dnetlib.msro.workflows.WorkflowsConstants;
23 46425 michele.ar
import eu.dnetlib.msro.workflows.nodes.AbstractParallelProcessNode;
24 45395 michele.ar
import eu.dnetlib.msro.workflows.util.NodeHelper;
25
import eu.dnetlib.msro.workflows.util.NodeTokenCallback;
26 46456 michele.ar
import eu.dnetlib.services.async.AsyncMethodStatus;
27
import eu.dnetlib.services.async.AsyncResponse;
28 45395 michele.ar
29
/**
30
 * Created by michele on 26/11/15.
31
 */
32
@Component
33
public class ProcessEngine {
34
35
	private static final Log log = LogFactory.getLog(ProcessEngine.class);
36
37 46786 michele.ar
	@Autowired
38 45576 michele.ar
	private NodeHelper nodeHelper;
39
40 45395 michele.ar
	public void startProcess(final WorkflowProcess process) {
41
		log.info(process.getGraph());
42
43
		log.info("Starting workflow: " + process);
44
45
		final long now = DateUtils.now();
46 46679 michele.ar
		process.setStatus(ProcessStatus.EXECUTING);
47 45395 michele.ar
		process.setStartDate(now);
48
		process.setLastActivityDate(now);
49
50
		try {
51
			for (final GraphNode node : process.getGraph().startNodes()) {
52 46785 michele.ar
53 46425 michele.ar
				final AbstractParallelProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv());
54 45395 michele.ar
				final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
55
56
				token.getEnv().addAttributes(process.getEnv().getAttributes());
57
				process.getTokens().add(token);
58
59
				pNode.execute(token);
60
			}
61
		} catch (final Throwable e) {
62
			log.error("WorkflowProcess node instantiation failed", e);
63 46679 michele.ar
			process.setStatus(ProcessStatus.FAILURE);
64 45395 michele.ar
		}
65
	}
66
67
	public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
68
		process.setLastActivityDate(DateUtils.now());
69
70
		try {
71
			for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) {
72 46785 michele.ar
				log.info("Executing node: " + node.getName());
73
74 45395 michele.ar
				if (node.isJoin() || node.isSucessNode()) {
75
					if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) {
76
						process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList<Token>());
77
					}
78
79
					final List<Token> list = process.getPausedJoinNodeTokens().get(node.getName());
80
81
					list.add(oldToken);
82
83
					if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) {
84
						final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
85
						token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()])));
86 46425 michele.ar
						final AbstractParallelProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv());
87 45395 michele.ar
88
						process.getTokens().add(token);
89
						process.setLastActivityDate(DateUtils.now());
90
91
						if (node.isSucessNode()) {
92
							markAsCompleted(process, token);
93
						} else {
94
							pNode.execute(token);
95
						}
96
					}
97
				} else {
98
					final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
99
					token.getEnv().addAttributes(oldToken.getEnv().getAttributes());
100 46425 michele.ar
					final AbstractParallelProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv());
101 45395 michele.ar
102
					process.getTokens().add(token);
103
					process.setLastActivityDate(DateUtils.now());
104
					pNode.execute(token);
105
				}
106
			}
107
		} catch (final Throwable e) {
108
			log.error("WorkflowProcess node instantiation failed", e);
109 46679 michele.ar
			process.setStatus(ProcessStatus.FAILURE);
110 45395 michele.ar
			process.setError(e.getMessage());
111
			process.setErrorStacktrace(Throwables.getStackTraceAsString(e));
112
			process.setLastActivityDate(DateUtils.now());
113
		}
114
115
	}
116
117
	private NodeTokenCallback newNodeTokenCallback(final WorkflowProcess process, final GraphNode node) {
118
		return new NodeTokenCallback() {
119
120
			@Override
121
			public void onSuccess(final Token token) {
122
				releaseToken(process, node, token);
123
			}
124
125
			@Override
126
			public void onFail(final Token token) {
127
				completeProcess(process, token);
128
			}
129
		};
130
	}
131
132
	private Map<String, Object> mergeEnvParams(final Token... tokens) {
133
		final Map<String, Object> map = new HashMap<String, Object>();
134
		Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes()));
135
		return map;
136
	}
137
138
	private void markAsCompleted(final WorkflowProcess process, final Token token) {
139
		completeProcess(process, token);
140
	}
141
142
	private void completeProcess(final WorkflowProcess process, final Token token) {
143
		if (token.isActive()) {
144
			if (StringUtils.isNotBlank(token.getError())) {
145
				token.releaseAsFailed(token.getError());
146
			} else {
147
				token.releaseAsCompleted();
148
			}
149
		}
150
151
		final long now = token.getEndDate();
152
153
		process.setLastActivityDate(now);
154
		process.setEndDate(now);
155 46679 michele.ar
		process.setStatus(token.isFailed() ? ProcessStatus.FAILURE : ProcessStatus.SUCCESS);
156 46456 michele.ar
		process.setOutputParams(filterOutputParams(token.getEnv().getAttributes()));
157 45395 michele.ar
158 46456 michele.ar
		final AsyncResponse asyncResponse = new AsyncResponse();
159
160 45395 michele.ar
		if (token.isFailed()) {
161 46456 michele.ar
			asyncResponse.setStatus(AsyncMethodStatus.FAILED);
162 46679 michele.ar
			process.setStatus(ProcessStatus.FAILURE);
163 45395 michele.ar
			process.setError(token.getError());
164
			process.setErrorStacktrace(token.getErrorStackTrace());
165
			process.setLastActivityDate(DateUtils.now());
166 46456 michele.ar
			process.getOutputParams().put(WorkflowsConstants.LOG_SYSTEM_ERROR, token.getError());
167
			process.getOutputParams().put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, token.getErrorStackTrace());
168
		} else {
169
			asyncResponse.setStatus(AsyncMethodStatus.SUCCESS);
170 45395 michele.ar
		}
171
172 46456 michele.ar
		asyncResponse.prepareResponse(process.getOutputParams());
173 45395 michele.ar
174 46456 michele.ar
		process.getCallback().notify(asyncResponse);
175 45395 michele.ar
176
	}
177
178
	private Map<String, String> filterOutputParams(final Map<String, Object> map) {
179
		return map == null ? new HashMap<>() : map.entrySet().stream()
180
				.filter(e -> StringUtils.isNotBlank(e.getKey()))
181
				.filter(e -> e.getValue() != null)
182
				.filter(e -> e.getKey().startsWith(WorkflowsConstants.DATASOURCE_PREFIX) || e.getKey().startsWith(WorkflowsConstants.MAIN_LOG_PREFIX))
183
				.collect(Collectors.toMap(
184
						e -> StringEscapeUtils.escapeXml11(e.getKey()),
185
						e -> StringEscapeUtils.escapeXml11(e.getValue().toString())));
186
	}
187
188
}