Project

General

Profile

1
package eu.dnetlib.msro.workflows.util;
2

    
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8

    
9
import org.antlr.stringtemplate.StringTemplate;
10
import org.apache.commons.io.IOUtils;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.lang3.math.NumberUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.dom4j.Document;
16
import org.dom4j.Element;
17
import org.dom4j.io.SAXReader;
18
import org.springframework.beans.factory.annotation.Autowired;
19
import org.springframework.stereotype.Component;
20

    
21
import com.fasterxml.jackson.core.type.TypeReference;
22
import com.fasterxml.jackson.databind.ObjectMapper;
23

    
24
import eu.dnetlib.clients.is.InformationServiceClient;
25
import eu.dnetlib.clients.locators.ServiceClientFactory;
26
import eu.dnetlib.clients.msro.MsroWorkerClient;
27
import eu.dnetlib.clients.msro.Workflow;
28
import eu.dnetlib.clients.msro.WorkflowTemplate;
29
import eu.dnetlib.exceptions.DnetGenericRuntimeException;
30
import eu.dnetlib.miscutils.collections.Pair;
31
import eu.dnetlib.msro.exceptions.MSROException;
32
import eu.dnetlib.msro.workflows.WorkflowInstance;
33
import eu.dnetlib.msro.workflows.WorkflowRef;
34
import eu.dnetlib.msro.workflows.util.logs.WorkflowLog;
35
import eu.dnetlib.msro.workflows.util.logs.WorkflowLogRepository;
36
import eu.dnetlib.services.async.AsyncClientCallback;
37
import eu.dnetlib.services.async.AsyncResponse;
38
import eu.dnetlib.services.async.AsyncServerCallback;
39

    
40
@Component
41
public class WorkflowDispatcher {
42

    
43
	private static final Log log = LogFactory.getLog(WorkflowDispatcher.class);
44

    
45
	@Autowired
46
	private InformationServiceClient isClient;
47

    
48
	@Autowired
49
	private WorkflowLogRepository wfLogRepository;
50

    
51
	@Autowired
52
	private GraphLoader graphLoader;
53

    
54
	@Autowired
55
	private ServiceClientFactory clientFactory;
56

    
57
	public static final int DEFAULT_PRIORITY = 50;
58

    
59
	public WorkflowRef startWorkflow(final Workflow wf, final String localBaseUrl, final AsyncServerCallback callback) throws MSROException {
60
		return startWorkflow(wf.getId(), wf.getParent(), wf.getDsId(), wf.getIface(), localBaseUrl, callback);
61
	}
62

    
63
	public WorkflowRef startWorkflow(final String profileId,
64
			final String parent,
65
			final String dsId,
66
			final String iface,
67
			final String localBaseUrl,
68
			final AsyncServerCallback callback) throws MSROException {
69
		try {
70
			final WorkflowInstance wf = newWorkflowInstance(profileId);
71

    
72
			// override
73
			if (StringUtils.isNotBlank(parent)) {
74
				wf.setParent(parent);
75
			}
76
			if (StringUtils.isNotBlank(dsId)) {
77
				wf.setDsId(dsId);
78
			}
79
			if (StringUtils.isNotBlank(iface)) {
80
				wf.setDsInterface(iface);
81
			}
82
			// end
83
			return executeWf(wf, localBaseUrl, callback);
84
		} catch (final Exception e) {
85
			log.error("Error parsing workflow: " + profileId, e);
86
			throw new MSROException("Error parsing workflow");
87
		}
88
	}
89

    
90
	private WorkflowInstance newWorkflowInstance(final String profileId) throws Exception {
91
		final String profile = isClient.getProfile(profileId);
92
		final Document doc = new SAXReader().read(new StringReader(profile));
93

    
94
		final boolean isReady = doc.valueOf("//CONFIGURATION/@status").equals(WorkflowsConstants.WorkflowStatus.EXECUTABLE.toString());
95
		final boolean isDisabled = doc.valueOf("//CONFIGURATION/@start").equals("disabled");
96
		if (!isReady || isDisabled) {
97
			log.warn("Wf not launched, because it is not ready to start or it is disabled");
98
			throw new MSROException("Workflow is not ready to start");
99
		}
100

    
101
		final Map<String, String> globalParams = new HashMap<String, String>();
102
		for (final Object o : doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) {
103
			final Element p = (Element) o;
104
			globalParams.put(p.valueOf("@name"), p.getTextTrim());
105
		}
106

    
107
		final WorkflowInstance wf = new WorkflowInstance();
108
		wf.setProfileId(profileId);
109
		wf.setName(doc.valueOf("//WORKFLOW_NAME"));
110
		wf.setFamily(doc.valueOf("//WORKFLOW_FAMILY"));
111
		wf.setPriority(NumberUtils.toInt("//WORKFLOW_PRIORITY", DEFAULT_PRIORITY));
112
		wf.setDsId(doc.valueOf("//DATASOURCE/@id"));
113
		wf.setDsInterface(doc.valueOf("//DATASOURCE/@interface"));
114
		wf.setTemplate(false);
115
		wf.setGraph(graphLoader.loadGraph(doc, globalParams));
116
		wf.setGlobalParams(globalParams);
117

    
118
		return wf;
119
	}
120

    
121
	public WorkflowRef startWorkflowTemplate(final WorkflowTemplate wf,
122
			final String localBaseUrl,
123
			final AsyncServerCallback callback) throws MSROException {
124
		return startWorkflowTemplate(wf.getId(), wf.getParent(), wf.getName(), wf.getFamily(), wf.getDsId(), wf.getIface(), wf.getParams(), localBaseUrl,
125
				callback);
126
	}
127

    
128
	public WorkflowRef startWorkflowTemplate(final String profileId,
129
			final String parent,
130
			final String name,
131
			final String family,
132
			final String dsId,
133
			final String iface,
134
			final Map<String, String> params,
135
			final String localBaseUrl,
136
			final AsyncServerCallback callback) throws MSROException {
137

    
138
		try {
139
			final String profile = isClient.getProfile(profileId);
140
			final Document doc = new SAXReader().read(new StringReader(profile));
141

    
142
			final Map<String, String> globalParams = new HashMap<String, String>();
143
			for (final Object o : doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) {
144
				final Element p = (Element) o;
145
				final String pname = p.valueOf("@name");
146
				if (StringUtils.isNotBlank(params.get(pname))) {
147
					globalParams.put(pname, params.get(pname));
148
				} else if (p.selectSingleNode("@default") != null) {
149
					globalParams.put(pname, p.valueOf("@default"));
150
				} else if (!StringUtils.equalsIgnoreCase(p.valueOf("@required"), "true")) {
151
					globalParams.put(pname, "");
152
				} else {
153
					throw new MSROException("A required parameter is missing in wf template:" + pname);
154
				}
155
			}
156

    
157
			final WorkflowInstance wf = new WorkflowInstance();
158
			wf.setName(name);
159
			wf.setFamily(family);
160
			wf.setDsId(dsId);
161
			wf.setDsInterface(iface);
162
			wf.setGraph(graphLoader.loadGraph(doc, globalParams));
163
			wf.setPriority(DEFAULT_PRIORITY);
164
			wf.setProfileId(profileId);
165
			wf.setTemplate(true);
166
			wf.setGlobalParams(globalParams);
167
			wf.setParent(parent);
168

    
169
			return executeWf(wf, localBaseUrl, callback);
170
		} catch (final Exception e) {
171
			log.error("Error starting workflow template: " + profileId, e);
172
			throw new MSROException("Error starting workflow template", e);
173
		}
174
	}
175

    
176
	private WorkflowRef executeWf(final WorkflowInstance wf, final String localBaseUrl, final AsyncServerCallback callback) {
177

    
178
		// TODO Selezionare il worker in base alle caratterisiche del wf
179
		final MsroWorkerClient worker = clientFactory.getClient(MsroWorkerClient.class);
180

    
181
		final WorkflowRef ref = worker.execute(wf, localBaseUrl, new AsyncClientCallback() {
182

    
183
			private ObjectMapper mapper = new ObjectMapper();
184

    
185
			@Override
186
			public void onGoing(final AsyncResponse res) {
187
				log.info("ONGOING - " + res.getResponseJson());
188

    
189
				if (callback != null) {
190
					callback.notify(res);
191
				}
192
			}
193

    
194
			@Override
195
			public void onFailed(final AsyncResponse res) {
196
				log.info("FAILED - " + res.getResponseJson());
197
				try {
198
					saveResponse(wf, false, worker.getBaseUrl(), mapper.readValue(res.getResponseJson(), new TypeReference<List<Pair<String, String>>>() {}));
199

    
200
					if (callback != null) {
201
						callback.notify(res);
202
					}
203
				} catch (final IOException e) {
204
					log.error("Error parsing async response: " + res.getResponseJson(), e);
205
					throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e);
206
				}
207
			}
208

    
209
			@Override
210
			public void onDone(final AsyncResponse res) {
211
				log.info("DONE - " + res.getResponseJson());
212
				try {
213
					saveResponse(wf, true, worker.getBaseUrl(), mapper.readValue(res.getResponseJson(), new TypeReference<List<Pair<String, String>>>() {}));
214

    
215
					if (callback != null) {
216
						callback.notify(res);
217
					}
218
				} catch (final IOException e) {
219
					log.error("Error parsing async response: " + res.getResponseJson(), e);
220
					throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e);
221
				}
222
			}
223
		});
224

    
225
		log.info("Assigned wf " + wf.getProfileId() + " to worker " + worker.getBaseUrl() + ", procId: " + ref.getProcId());
226

    
227
		return ref;
228
	}
229

    
230
	private void saveResponse(final WorkflowInstance wf, final boolean success, final String workerUrl, final List<Pair<String, String>> res) {
231
		updateWfProfile(wf, success, workerUrl, res);
232
		updateJournal(wf, success, workerUrl, res);
233
		sendMail(wf, success, workerUrl, res);
234
	}
235

    
236
	private void sendMail(final WorkflowInstance wf, final boolean success, final String workerUrl, final List<Pair<String, String>> res) {
237
		log.info("Sending mail to ...  TODO");
238

    
239
		/*
240
		 * TODO SEND MAIL emailDispatcher.sendMails(process);
241
		 */
242
	}
243

    
244
	private void updateJournal(final WorkflowInstance wf, final boolean success, final String workerUrl, final List<Pair<String, String>> res) {
245

    
246
		final WorkflowLog wfLog = new WorkflowLog();
247
		wfLog.setName(wf.getName());
248
		wfLog.setFamily(wf.getFamily());
249
		wfLog.setPriority(wf.getPriority());
250
		wfLog.setTemplate(wf.isTemplate());
251
		wfLog.setProfileId(wf.getProfileId());
252
		wfLog.setParent(wf.getParent());
253
		wfLog.setDatasourceId(wf.getDsId());
254
		wfLog.setDatasourceApi(wf.getDsInterface());
255
		wfLog.setSuccess(success);
256
		wfLog.setWorkerUrl(workerUrl);
257

    
258
		for (final Pair<String, String> p : res) {
259

    
260
			// TODO: molti dei parametri seguenti sono assenti: controllare la risposta
261
			log.info("*********** " + p.getKey() + "->" + p.getValue());
262

    
263
			if (p.getKey().equals(WorkflowsConstants.LOG_WF_PROCESS_ID)) {
264
				wfLog.setProcId(p.getValue());
265
			} else if (p.getKey().equals(WorkflowsConstants.LOG_DATASOURCE_NAME)) {
266
				wfLog.setDatasourceName(p.getValue());
267
			} else if (p.getKey().equals(WorkflowsConstants.LOG_WF_PROCESS_START_DATE)) {
268
				wfLog.setStartDate(NumberUtils.toLong(p.getValue(), 0));
269
			} else if (p.getKey().equals(WorkflowsConstants.LOG_WF_PROCESS_END_DATE)) {
270
				wfLog.setEndDate(NumberUtils.toLong(p.getValue(), 0));
271
			} else if (p.getKey().equals(WorkflowsConstants.LOG_SYSTEM_ERROR) && !success) {
272
				wfLog.setError(p.getValue());
273
			} else if (p.getKey().equals(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE) && !success) {
274
				wfLog.setStacktrace(p.getValue());
275
			} else {
276
				wfLog.getOutputParams().put(p.getKey(), p.getValue());
277
			}
278

    
279
		}
280

    
281
		log.info("Adding " + wfLog + " to wfjournal");
282

    
283
		wfLogRepository.save(wfLog);
284

    
285
	}
286

    
287
	private void updateWfProfile(final WorkflowInstance wf, final boolean success, final String workerUrl, final List<Pair<String, String>> res) {
288
		if (!wf.isTemplate() && StringUtils.isNotBlank(wf.getProfileId())) {
289
			try {
290
				final String template = IOUtils.toString(getClass().getResourceAsStream("/templates/workflow_status.xml.st"));
291
				final StringTemplate st = new StringTemplate(template);
292

    
293
				for (final Pair<String, String> p : res) {
294
					if (p.getKey().equals(WorkflowsConstants.LOG_WF_PROCESS_ID)) {
295
						st.setAttribute("procId", p.getValue());
296
					} else if (p.getKey().equals(WorkflowsConstants.LOG_WF_PROCESS_END_DATE)) {
297
						st.setAttribute("date", p.getValue());
298
					} else if (p.getKey().equals(WorkflowsConstants.LOG_SYSTEM_ERROR) && !success) {
299
						st.setAttribute("error", p.getValue());
300
					}
301
				}
302

    
303
				st.setAttribute("params", res);
304

    
305
				log.info("Updating wf profile " + wf.getProfileId());
306

    
307
				isClient.updateProfileNode(wf.getProfileId(), "//STATUS", st.toString());
308

    
309
			} catch (final Exception e) {
310
				log.error("Error saving wf response", e);
311
				throw new DnetGenericRuntimeException("Error saving wf response", e);
312
			}
313
		}
314

    
315
	}
316

    
317
}
(3-3/4)