Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.util.ArrayList;
4
import java.util.Arrays;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.stream.Collectors;
9

    
10
import org.apache.commons.lang3.StringEscapeUtils;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.beans.factory.annotation.Autowired;
15
import org.springframework.stereotype.Component;
16

    
17
import com.google.common.base.Throwables;
18

    
19
import eu.dnetlib.miscutils.datetime.DateUtils;
20
import eu.dnetlib.msro.workflows.GraphNode;
21
import eu.dnetlib.msro.workflows.ProcessStatus;
22
import eu.dnetlib.msro.workflows.WorkflowsConstants;
23
import eu.dnetlib.msro.workflows.nodes.AbstractProcessNode;
24
import eu.dnetlib.msro.workflows.util.NodeHelper;
25
import eu.dnetlib.msro.workflows.util.NodeTokenCallback;
26
import eu.dnetlib.services.async.AsyncMethodStatus;
27
import eu.dnetlib.services.async.AsyncResponse;
28

    
29
/**
30
 * Created by michele on 26/11/15.
31
 */
32
@Component
33
public class ProcessEngine {
34

    
35
	private static final Log log = LogFactory.getLog(ProcessEngine.class);
36

    
37
	@Autowired
38
	private NodeHelper nodeHelper;
39

    
40
	public void startProcess(final WorkflowProcess process) {
41
		log.info(process.getGraph());
42

    
43
		log.info("Starting workflow: " + process);
44

    
45
		final long now = DateUtils.now();
46
		process.setStatus(ProcessStatus.EXECUTING);
47
		process.setStartDate(now);
48
		process.setLastActivityDate(now);
49

    
50
		try {
51
			for (final GraphNode node : process.getGraph().startNodes()) {
52

    
53
				final AbstractProcessNode pNode = nodeHelper.newProcessNode(node, process, process.getEnv());
54
				final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
55

    
56
				token.getEnv().addAttributes(process.getEnv().getAttributes());
57
				process.getTokens().add(token);
58

    
59
				pNode.execute(token);
60
			}
61
		} catch (final Throwable e) {
62
			log.error("WorkflowProcess node instantiation failed", e);
63
			process.setStatus(ProcessStatus.FAILURE);
64
		}
65
	}
66

    
67
	public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
68
		process.setLastActivityDate(DateUtils.now());
69

    
70
		try {
71
			for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) {
72
				log.info("Executing node: " + node.getName());
73

    
74
				if (node.isJoin() || node.isSucessNode()) {
75
					if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) {
76
						process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList<Token>());
77
					}
78

    
79
					final List<Token> list = process.getPausedJoinNodeTokens().get(node.getName());
80

    
81
					list.add(oldToken);
82

    
83
					if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) {
84
						final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
85
						token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()])));
86
						final AbstractProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv());
87

    
88
						process.getTokens().add(token);
89
						process.setLastActivityDate(DateUtils.now());
90

    
91
						if (node.isSucessNode()) {
92
							markAsCompleted(process, token);
93
						} else {
94
							pNode.execute(token);
95
						}
96
					}
97
				} else {
98
					final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
99
					token.getEnv().addAttributes(oldToken.getEnv().getAttributes());
100
					final AbstractProcessNode pNode = nodeHelper.newProcessNode(node, process, token.getEnv());
101

    
102
					process.getTokens().add(token);
103
					process.setLastActivityDate(DateUtils.now());
104
					pNode.execute(token);
105
				}
106
			}
107
		} catch (final Throwable e) {
108
			log.error("WorkflowProcess node instantiation failed", e);
109
			process.setStatus(ProcessStatus.FAILURE);
110
			process.setError(e.getMessage());
111
			process.setErrorStacktrace(Throwables.getStackTraceAsString(e));
112
			process.setLastActivityDate(DateUtils.now());
113
		}
114

    
115
	}
116

    
117
	private NodeTokenCallback newNodeTokenCallback(final WorkflowProcess process, final GraphNode node) {
118
		return new NodeTokenCallback() {
119

    
120
			@Override
121
			public void onSuccess(final Token token) {
122
				releaseToken(process, node, token);
123
			}
124

    
125
			@Override
126
			public void onFail(final Token token) {
127
				completeProcess(process, token);
128
			}
129
		};
130
	}
131

    
132
	private Map<String, Object> mergeEnvParams(final Token... tokens) {
133
		final Map<String, Object> map = new HashMap<String, Object>();
134
		Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes()));
135
		return map;
136
	}
137

    
138
	private void markAsCompleted(final WorkflowProcess process, final Token token) {
139
		completeProcess(process, token);
140
	}
141

    
142
	private void completeProcess(final WorkflowProcess process, final Token token) {
143
		if (token.isActive()) {
144
			if (StringUtils.isNotBlank(token.getError())) {
145
				token.releaseAsFailed(token.getError());
146
			} else {
147
				token.releaseAsCompleted();
148
			}
149
		}
150

    
151
		final long now = token.getEndDate();
152

    
153
		process.setLastActivityDate(now);
154
		process.setEndDate(now);
155
		process.setStatus(token.isFailed() ? ProcessStatus.FAILURE : ProcessStatus.SUCCESS);
156
		process.setOutputParams(filterOutputParams(token.getEnv().getAttributes()));
157

    
158
		final AsyncResponse asyncResponse = new AsyncResponse();
159

    
160
		if (token.isFailed()) {
161
			asyncResponse.setStatus(AsyncMethodStatus.FAILED);
162
			process.setStatus(ProcessStatus.FAILURE);
163
			process.setError(token.getError());
164
			process.setErrorStacktrace(token.getErrorStackTrace());
165
			process.setLastActivityDate(DateUtils.now());
166
			process.getOutputParams().put(WorkflowsConstants.LOG_SYSTEM_ERROR, token.getError());
167
			process.getOutputParams().put(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, token.getErrorStackTrace());
168
		} else {
169
			asyncResponse.setStatus(AsyncMethodStatus.SUCCESS);
170
		}
171

    
172
		asyncResponse.prepareResponse(process.getOutputParams());
173

    
174
		process.getCallback().notify(asyncResponse);
175

    
176
	}
177

    
178
	private Map<String, String> filterOutputParams(final Map<String, Object> map) {
179
		return map == null ? new HashMap<>() : map.entrySet().stream()
180
				.filter(e -> StringUtils.isNotBlank(e.getKey()))
181
				.filter(e -> e.getValue() != null)
182
				.filter(e -> e.getKey().startsWith(WorkflowsConstants.DATASOURCE_PREFIX) || e.getKey().startsWith(WorkflowsConstants.MAIN_LOG_PREFIX))
183
				.collect(Collectors.toMap(
184
						e -> StringEscapeUtils.escapeXml11(e.getKey()),
185
						e -> StringEscapeUtils.escapeXml11(e.getValue().toString())));
186
	}
187

    
188
}
(3-3/8)