Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.util.*;
4
import java.util.stream.Collectors;
5

    
6
import com.google.common.base.Throwables;
7
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
8
import eu.dnetlib.miscutils.datetime.DateUtils;
9
import eu.dnetlib.msro.logging.DnetLogger;
10
import eu.dnetlib.msro.notification.EmailDispatcher;
11
import eu.dnetlib.msro.workflows.graph.GraphNode;
12
import eu.dnetlib.msro.workflows.nodes.ProcessNode;
13
import eu.dnetlib.msro.workflows.procs.WorkflowProcess.Status;
14
import eu.dnetlib.msro.workflows.util.NodeHelper;
15
import eu.dnetlib.msro.workflows.util.NodeTokenCallback;
16
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
17
import eu.dnetlib.rmi.enabling.ISRegistryService;
18
import org.antlr.stringtemplate.StringTemplate;
19
import org.apache.commons.io.IOUtils;
20
import org.apache.commons.lang3.StringEscapeUtils;
21
import org.apache.commons.lang3.StringUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.springframework.beans.factory.annotation.Autowired;
25
import org.springframework.beans.factory.annotation.Required;
26

    
27
/**
28
 * Created by michele on 26/11/15.
29
 */
30
public class ProcessEngine {
31

    
32
	private static final Log log = LogFactory.getLog(ProcessEngine.class);
33

    
34
	@Autowired
35
	private UniqueServiceLocator serviceLocator;
36

    
37
	private NodeHelper nodeHelper;
38
	private DnetLogger dnetLogger;
39
	private EmailDispatcher emailDispatcher;
40

    
41
	public void startProcess(final WorkflowProcess process) {
42
		log.info(process.getGraph());
43

    
44
		log.info("Starting workflow: " + process);
45

    
46
		final long now = DateUtils.now();
47
		process.setStatus(WorkflowProcess.Status.EXECUTING);
48
		process.setStartDate(now);
49
		process.setLastActivityDate(now);
50

    
51
		try {
52
			for (final GraphNode node : process.getGraph().startNodes()) {
53
				final ProcessNode pNode = this.nodeHelper.newProcessNode(node, process);
54
				final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
55

    
56
				token.getEnv().addAttributes(process.getEnv().getAttributes());
57
				process.getTokens().add(token);
58

    
59
				pNode.execute(token);
60
			}
61
		} catch (final Throwable e) {
62
			log.error("WorkflowProcess node instantiation failed", e);
63
			process.setStatus(WorkflowProcess.Status.FAILURE);
64
		}
65
	}
66

    
67
	public void releaseToken(final WorkflowProcess process, final GraphNode oldGraphNode, final Token oldToken) {
68
		process.setLastActivityDate(DateUtils.now());
69

    
70
		try {
71
			for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) {
72
				if (node.isJoin() || node.isSucessNode()) {
73
					if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) {
74
						process.getPausedJoinNodeTokens().put(node.getName(), new ArrayList<Token>());
75
					}
76

    
77
					final List<Token> list = process.getPausedJoinNodeTokens().get(node.getName());
78

    
79
					list.add(oldToken);
80

    
81
					if (list.size() == process.getGraph().getNumberOfIncomingArcs(node)) {
82
						final ProcessNode pNode = this.nodeHelper.newProcessNode(node, process);
83
						final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
84

    
85
						token.getEnv().addAttributes(mergeEnvParams(list.toArray(new Token[list.size()])));
86
						process.getTokens().add(token);
87
						process.setLastActivityDate(DateUtils.now());
88

    
89
						if (node.isSucessNode()) {
90
							markAsCompleted(process, token);
91
						} else {
92
							pNode.execute(token);
93
						}
94
					}
95
				} else {
96
					final ProcessNode pNode = this.nodeHelper.newProcessNode(node, process);
97
					final Token token = new Token(node.getName(), newNodeTokenCallback(process, node));
98
					token.getEnv().addAttributes(oldToken.getEnv().getAttributes());
99
					process.getTokens().add(token);
100
					process.setLastActivityDate(DateUtils.now());
101
					pNode.execute(token);
102
				}
103
			}
104
		} catch (final Throwable e) {
105
			log.error("WorkflowProcess node instantiation failed", e);
106
			process.setStatus(WorkflowProcess.Status.FAILURE);
107
			process.setError(e.getMessage());
108
			process.setErrorStacktrace(Throwables.getStackTraceAsString(e));
109
			process.setLastActivityDate(DateUtils.now());
110
		}
111

    
112
	}
113

    
114
	private NodeTokenCallback newNodeTokenCallback(final WorkflowProcess process, final GraphNode node) {
115
		return new NodeTokenCallback() {
116

    
117
			@Override
118
			public void onSuccess(final Token token) {
119
				releaseToken(process, node, token);
120
			}
121

    
122
			@Override
123
			public void onFail(final Token token) {
124
				completeProcess(process, token);
125
			}
126
		};
127
	}
128

    
129
	private Map<String, Object> mergeEnvParams(final Token... tokens) {
130
		final Map<String, Object> map = new HashMap<String, Object>();
131
		Arrays.stream(tokens).forEach(t -> map.putAll(t.getEnv().getAttributes()));
132
		return map;
133
	}
134

    
135
	private void markAsCompleted(final WorkflowProcess process, final Token token) {
136
		completeProcess(process, token);
137
	}
138

    
139
	private void completeProcess(final WorkflowProcess process, final Token token) {
140
		if (token.isActive()) {
141
			if (StringUtils.isNotBlank(token.getError())) {
142
				token.releaseAsFailed(token.getError());
143
			} else {
144
				token.release();
145
			}
146
		}
147

    
148
		final long now = token.getEndDate();
149

    
150
		process.setLastActivityDate(now);
151
		process.setEndDate(now);
152
		process.setStatus(token.isFailed() ? WorkflowProcess.Status.FAILURE : WorkflowProcess.Status.SUCCESS);
153

    
154
		if (token.isFailed()) {
155
			process.setStatus(Status.FAILURE);
156
			process.setError(token.getError());
157
			process.setErrorStacktrace(token.getErrorStackTrace());
158
			process.setLastActivityDate(DateUtils.now());
159
		}
160

    
161
		if (process.getCallback() != null) {
162
			if (token.isFailed()) {
163
				process.getCallback().onFail();
164
			} else {
165
				process.getCallback().onSuccess();
166
			}
167
		}
168

    
169
		process.setOutputParams(filterOutputParams(token.getEnv().getAttributes()));
170

    
171
		final String profileId = process.getProfileId();
172
		if (!process.isTemplate() && StringUtils.isNotBlank(profileId)) {
173
			try {
174
				final String template = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/msro/workflows/templates/workflow_status.xml.st"));
175
				final StringTemplate st = new StringTemplate(template);
176
				st.setAttribute("procId", StringEscapeUtils.escapeXml11(process.getId()));
177
				st.setAttribute("date", StringEscapeUtils.escapeXml11(DateUtils.calculate_ISO8601(now)));
178
				st.setAttribute("params", process.getOutputParams());
179
				if (process.getStatus() == Status.FAILURE) {
180
					st.setAttribute("error", StringEscapeUtils.escapeXml11(process.getError()));
181
				}
182
				this.serviceLocator.getService(ISRegistryService.class).updateProfileNode(profileId, "//STATUS", st.toString());
183
			} catch (final Exception e) {
184
				log.error("Error updating workflow profile: " + profileId, e);
185
				process.setStatus(WorkflowProcess.Status.FAILURE);
186
				process.setError("Error updating workflow profile: " + profileId);
187
				process.setErrorStacktrace(Throwables.getStackTraceAsString(e));
188
			}
189
		}
190

    
191
		this.dnetLogger.newLogMessage()
192
				.addDetails(process.getOutputParams())
193
				.addDetail(WorkflowsConstants.LOG_WF_NAME, process.getName())
194
				.addDetail(WorkflowsConstants.LOG_WF_FAMILY, process.getFamily())
195
				.addDetail(WorkflowsConstants.LOG_WF_PRIORITY, "" + process.getPriority())
196
				.addDetail(WorkflowsConstants.LOG_WF_PROCESS_ID, process.getId())
197
				.addDetail(WorkflowsConstants.LOG_WF_PROCESS_STATUS, process.getStatus().toString())
198
				.addDetail(WorkflowsConstants.LOG_WF_PROCESS_START_DATE, Long.toString(process.getStartDate()))
199
				.addDetail(WorkflowsConstants.LOG_WF_PROCESS_END_DATE, Long.toString(process.getEndDate()))
200
				.addDetail(WorkflowsConstants.LOG_WF_PROFILE_ID, process.isTemplate() ? null : process.getProfileId())
201
				.addDetail(WorkflowsConstants.LOG_WF_PROFILE_TEMPLATE_ID, process.isTemplate() ? process.getProfileId() : null)
202
				.addDetail(WorkflowsConstants.LOG_DATASOURCE_ID, process.getDsId())
203
				.addDetail(WorkflowsConstants.LOG_DATASOURCE_NAME, process.getDsName())
204
				.addDetail(WorkflowsConstants.LOG_DATASOURCE_INTERFACE, process.getDsInterface())
205
				.addDetail(WorkflowsConstants.LOG_SYSTEM_ERROR, process.getError())
206
				.addDetail(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, process.getErrorStacktrace())
207
				.flush();
208

    
209
		this.emailDispatcher.sendMails(process);
210

    
211
	}
212

    
213
	private Map<String, String> filterOutputParams(final Map<String, Object> map) {
214
		return map == null ? new HashMap<>() : map.entrySet().stream()
215
				.filter(e -> StringUtils.isNotBlank(e.getKey()))
216
				.filter(e -> e.getValue() != null)
217
				.filter(e -> e.getKey().startsWith(WorkflowsConstants.DATASOURCE_PREFIX) || e.getKey().startsWith(WorkflowsConstants.MAIN_LOG_PREFIX))
218
				.collect(Collectors.toMap(
219
						e -> StringEscapeUtils.escapeXml11(e.getKey()),
220
						e -> StringEscapeUtils.escapeXml11(e.getValue().toString())));
221
	}
222

    
223
	public DnetLogger getDnetLogger() {
224
		return this.dnetLogger;
225
	}
226

    
227
	@Required
228
	public void setDnetLogger(final DnetLogger dnetLogger) {
229
		this.dnetLogger = dnetLogger;
230
	}
231

    
232
	public NodeHelper getNodeHelper() {
233
		return this.nodeHelper;
234
	}
235

    
236
	@Required
237
	public void setNodeHelper(final NodeHelper nodeHelper) {
238
		this.nodeHelper = nodeHelper;
239
	}
240

    
241
	public EmailDispatcher getEmailDispatcher() {
242
		return this.emailDispatcher;
243
	}
244

    
245
	@Required
246
	public void setEmailDispatcher(final EmailDispatcher emailDispatcher) {
247
		this.emailDispatcher = emailDispatcher;
248
	}
249

    
250
}
(3-3/8)