Project

General

Profile

« Previous | Next » 

Revision 46794

wf execution

View differences:

WorkflowDispatcher.java
1 1
package eu.dnetlib.msro.workflows.util;
2 2

  
3
import java.io.IOException;
3 4
import java.io.StringReader;
4 5
import java.util.HashMap;
5 6
import java.util.Map;
6 7

  
8
import org.antlr.stringtemplate.StringTemplate;
9
import org.apache.commons.io.IOUtils;
7 10
import org.apache.commons.lang3.StringUtils;
8 11
import org.apache.commons.lang3.math.NumberUtils;
9 12
import org.apache.commons.logging.Log;
......
14 17
import org.springframework.beans.factory.annotation.Autowired;
15 18
import org.springframework.stereotype.Component;
16 19

  
20
import com.fasterxml.jackson.core.type.TypeReference;
21
import com.fasterxml.jackson.databind.ObjectMapper;
22

  
17 23
import eu.dnetlib.clients.is.InformationServiceClient;
18 24
import eu.dnetlib.clients.locators.ServiceClientFactory;
19 25
import eu.dnetlib.clients.msro.MsroWorkerClient;
20 26
import eu.dnetlib.clients.msro.Workflow;
21 27
import eu.dnetlib.clients.msro.WorkflowTemplate;
28
import eu.dnetlib.exceptions.DnetGenericRuntimeException;
29
import eu.dnetlib.miscutils.datetime.DateUtils;
22 30
import eu.dnetlib.msro.exceptions.MSROException;
23 31
import eu.dnetlib.msro.workflows.WorkflowInstance;
24 32
import eu.dnetlib.services.async.AsyncClientCallback;
......
30 38
	private static final Log log = LogFactory.getLog(WorkflowDispatcher.class);
31 39

  
32 40
	@Autowired
33
	private InformationServiceClient isLookup;
41
	private InformationServiceClient isClient;
34 42

  
35 43
	@Autowired
36 44
	private GraphLoader graphLoader;
......
71 79
	}
72 80

  
73 81
	private WorkflowInstance newWorkflowInstance(final String profileId) throws Exception {
74
		final String profile = isLookup.getProfile(profileId);
82
		final String profile = isClient.getProfile(profileId);
75 83
		final Document doc = new SAXReader().read(new StringReader(profile));
76 84

  
77 85
		final boolean isReady = doc.valueOf("//CONFIGURATION/@status").equals(WorkflowsConstants.WorkflowStatus.EXECUTABLE.toString());
......
116 124
			final String localBaseUrl) throws MSROException {
117 125

  
118 126
		try {
119
			final String profile = isLookup.getProfile(profileId);
127
			final String profile = isClient.getProfile(profileId);
120 128
			final Document doc = new SAXReader().read(new StringReader(profile));
121 129

  
122 130
			final Map<String, String> globalParams = new HashMap<String, String>();
......
160 168

  
161 169
		worker.execute(wf, localBaseUrl, new AsyncClientCallback() {
162 170

  
171
			private ObjectMapper mapper = new ObjectMapper();
172

  
163 173
			@Override
164 174
			public void onGoing(final AsyncResponse res) {
165 175
				log.info("ONGOING - " + res.getResponseJson());
......
168 178
			@Override
169 179
			public void onFailed(final AsyncResponse res) {
170 180
				log.info("FAILED - " + res.getResponseJson());
171

  
181
				try {
182
					saveResponse(wf, false, mapper.readValue(res.getResponseJson(), new TypeReference<HashMap<String, Object>>() {}));
183
				} catch (final IOException e) {
184
					log.error("Error parsing async response: " + res.getResponseJson(), e);
185
					throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e);
186
				}
172 187
			}
173 188

  
174 189
			@Override
175 190
			public void onDone(final AsyncResponse res) {
176 191
				log.info("DONE - " + res.getResponseJson());
192
				try {
193
					saveResponse(wf, true, mapper.readValue(res.getResponseJson(), new TypeReference<HashMap<String, Object>>() {}));
194
				} catch (final IOException e) {
195
					log.error("Error parsing async response: " + res.getResponseJson(), e);
196
					throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e);
197
				}
177 198
			}
178 199
		});
179 200

  
......
181 202

  
182 203
	}
183 204

  
205
	private void saveResponse(final WorkflowInstance wf, final boolean success, final Map<String, String> res) {
206
		final String profileId = wf.getProfileId();
207

  
208
		if (!wf.isTemplate() && StringUtils.isNotBlank(profileId)) {
209
			try {
210
				final String template = IOUtils.toString(getClass().getResourceAsStream("/templates/workflow_status.xml.st"));
211
				final StringTemplate st = new StringTemplate(template);
212
				st.setAttribute("procId", res.getOrDefault(WorkflowsConstants.LOG_WF_PROCESS_ID, ""));
213
				st.setAttribute("date", res.getOrDefault(WorkflowsConstants.LOG_WF_PROCESS_END_DATE, DateUtils.now_ISO8601()));
214
				st.setAttribute("params", res);
215
				if (!success) {
216
					st.setAttribute("error", res.getOrDefault(WorkflowsConstants.LOG_SYSTEM_ERROR, "Unknown error"));
217
				}
218
				isClient.updateProfileNode(profileId, "//STATUS", st.toString());
219
			} catch (final Exception e) {
220
				log.error("Error saving wf response", e);
221
				throw new DnetGenericRuntimeException("Error saving wf response", e);
222
			}
223
		}
224

  
225
		/* TODO : generare la risposta da passare al manager server */
226
		/*
227
		 * dnetLogger.newLogMessage() .addDetails(process.getOutputParams()) .addDetail(WorkflowsConstants.LOG_WF_NAME, process.getName())
228
		 * .addDetail(WorkflowsConstants.LOG_WF_FAMILY, process.getFamily()) .addDetail(WorkflowsConstants.LOG_WF_PRIORITY, "" +
229
		 * process.getPriority()) .addDetail(WorkflowsConstants.LOG_WF_PROCESS_ID, process.getId())
230
		 * .addDetail(WorkflowsConstants.LOG_WF_PROCESS_STATUS, process.getStatus().toString())
231
		 * .addDetail(WorkflowsConstants.LOG_WF_PROCESS_START_DATE, Long.toString(process.getStartDate()))
232
		 * .addDetail(WorkflowsConstants.LOG_WF_PROCESS_END_DATE, Long.toString(process.getEndDate()))
233
		 * .addDetail(WorkflowsConstants.LOG_WF_PROFILE_ID, process.isTemplate() ? null : process.getProfileId())
234
		 * .addDetail(WorkflowsConstants.LOG_WF_PROFILE_TEMPLATE_ID, process.isTemplate() ? process.getProfileId() : null)
235
		 * .addDetail(WorkflowsConstants.LOG_WF_PARENT, process.getParentProfileId()) .addDetail(WorkflowsConstants.LOG_DATASOURCE_ID,
236
		 * process.getDsId()) .addDetail(WorkflowsConstants.LOG_DATASOURCE_NAME, process.getDsName())
237
		 * .addDetail(WorkflowsConstants.LOG_DATASOURCE_INTERFACE, process.getDsInterface()) .addDetail(WorkflowsConstants.LOG_SYSTEM_ERROR,
238
		 * process.getError()) .addDetail(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, process.getErrorStacktrace()) .flush();
239
		 *
240
		 * emailDispatcher.sendMails(process);
241
		 */}
242

  
184 243
}

Also available in: Unified diff