Revision 46794
Added by Michele Artini about 7 years ago
WorkflowDispatcher.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.workflows.util; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
3 | 4 |
import java.io.StringReader; |
4 | 5 |
import java.util.HashMap; |
5 | 6 |
import java.util.Map; |
6 | 7 |
|
8 |
import org.antlr.stringtemplate.StringTemplate; |
|
9 |
import org.apache.commons.io.IOUtils; |
|
7 | 10 |
import org.apache.commons.lang3.StringUtils; |
8 | 11 |
import org.apache.commons.lang3.math.NumberUtils; |
9 | 12 |
import org.apache.commons.logging.Log; |
... | ... | |
14 | 17 |
import org.springframework.beans.factory.annotation.Autowired; |
15 | 18 |
import org.springframework.stereotype.Component; |
16 | 19 |
|
20 |
import com.fasterxml.jackson.core.type.TypeReference; |
|
21 |
import com.fasterxml.jackson.databind.ObjectMapper; |
|
22 |
|
|
17 | 23 |
import eu.dnetlib.clients.is.InformationServiceClient; |
18 | 24 |
import eu.dnetlib.clients.locators.ServiceClientFactory; |
19 | 25 |
import eu.dnetlib.clients.msro.MsroWorkerClient; |
20 | 26 |
import eu.dnetlib.clients.msro.Workflow; |
21 | 27 |
import eu.dnetlib.clients.msro.WorkflowTemplate; |
28 |
import eu.dnetlib.exceptions.DnetGenericRuntimeException; |
|
29 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
22 | 30 |
import eu.dnetlib.msro.exceptions.MSROException; |
23 | 31 |
import eu.dnetlib.msro.workflows.WorkflowInstance; |
24 | 32 |
import eu.dnetlib.services.async.AsyncClientCallback; |
... | ... | |
30 | 38 |
private static final Log log = LogFactory.getLog(WorkflowDispatcher.class); |
31 | 39 |
|
32 | 40 |
@Autowired |
33 |
private InformationServiceClient isLookup;
|
|
41 |
private InformationServiceClient isClient;
|
|
34 | 42 |
|
35 | 43 |
@Autowired |
36 | 44 |
private GraphLoader graphLoader; |
... | ... | |
71 | 79 |
} |
72 | 80 |
|
73 | 81 |
private WorkflowInstance newWorkflowInstance(final String profileId) throws Exception { |
74 |
final String profile = isLookup.getProfile(profileId);
|
|
82 |
final String profile = isClient.getProfile(profileId);
|
|
75 | 83 |
final Document doc = new SAXReader().read(new StringReader(profile)); |
76 | 84 |
|
77 | 85 |
final boolean isReady = doc.valueOf("//CONFIGURATION/@status").equals(WorkflowsConstants.WorkflowStatus.EXECUTABLE.toString()); |
... | ... | |
116 | 124 |
final String localBaseUrl) throws MSROException { |
117 | 125 |
|
118 | 126 |
try { |
119 |
final String profile = isLookup.getProfile(profileId);
|
|
127 |
final String profile = isClient.getProfile(profileId);
|
|
120 | 128 |
final Document doc = new SAXReader().read(new StringReader(profile)); |
121 | 129 |
|
122 | 130 |
final Map<String, String> globalParams = new HashMap<String, String>(); |
... | ... | |
160 | 168 |
|
161 | 169 |
worker.execute(wf, localBaseUrl, new AsyncClientCallback() { |
162 | 170 |
|
171 |
private ObjectMapper mapper = new ObjectMapper(); |
|
172 |
|
|
163 | 173 |
@Override |
164 | 174 |
public void onGoing(final AsyncResponse res) { |
165 | 175 |
log.info("ONGOING - " + res.getResponseJson()); |
... | ... | |
168 | 178 |
@Override |
169 | 179 |
public void onFailed(final AsyncResponse res) { |
170 | 180 |
log.info("FAILED - " + res.getResponseJson()); |
171 |
|
|
181 |
try { |
|
182 |
saveResponse(wf, false, mapper.readValue(res.getResponseJson(), new TypeReference<HashMap<String, Object>>() {})); |
|
183 |
} catch (final IOException e) { |
|
184 |
log.error("Error parsing async response: " + res.getResponseJson(), e); |
|
185 |
throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e); |
|
186 |
} |
|
172 | 187 |
} |
173 | 188 |
|
174 | 189 |
@Override |
175 | 190 |
public void onDone(final AsyncResponse res) { |
176 | 191 |
log.info("DONE - " + res.getResponseJson()); |
192 |
try { |
|
193 |
saveResponse(wf, true, mapper.readValue(res.getResponseJson(), new TypeReference<HashMap<String, Object>>() {})); |
|
194 |
} catch (final IOException e) { |
|
195 |
log.error("Error parsing async response: " + res.getResponseJson(), e); |
|
196 |
throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e); |
|
197 |
} |
|
177 | 198 |
} |
178 | 199 |
}); |
179 | 200 |
|
... | ... | |
181 | 202 |
|
182 | 203 |
} |
183 | 204 |
|
205 |
private void saveResponse(final WorkflowInstance wf, final boolean success, final Map<String, String> res) { |
|
206 |
final String profileId = wf.getProfileId(); |
|
207 |
|
|
208 |
if (!wf.isTemplate() && StringUtils.isNotBlank(profileId)) { |
|
209 |
try { |
|
210 |
final String template = IOUtils.toString(getClass().getResourceAsStream("/templates/workflow_status.xml.st")); |
|
211 |
final StringTemplate st = new StringTemplate(template); |
|
212 |
st.setAttribute("procId", res.getOrDefault(WorkflowsConstants.LOG_WF_PROCESS_ID, "")); |
|
213 |
st.setAttribute("date", res.getOrDefault(WorkflowsConstants.LOG_WF_PROCESS_END_DATE, DateUtils.now_ISO8601())); |
|
214 |
st.setAttribute("params", res); |
|
215 |
if (!success) { |
|
216 |
st.setAttribute("error", res.getOrDefault(WorkflowsConstants.LOG_SYSTEM_ERROR, "Unknown error")); |
|
217 |
} |
|
218 |
isClient.updateProfileNode(profileId, "//STATUS", st.toString()); |
|
219 |
} catch (final Exception e) { |
|
220 |
log.error("Error saving wf response", e); |
|
221 |
throw new DnetGenericRuntimeException("Error saving wf response", e); |
|
222 |
} |
|
223 |
} |
|
224 |
|
|
225 |
/* TODO : generare la risposta da passare al manager server */ |
|
226 |
/* |
|
227 |
* dnetLogger.newLogMessage() .addDetails(process.getOutputParams()) .addDetail(WorkflowsConstants.LOG_WF_NAME, process.getName()) |
|
228 |
* .addDetail(WorkflowsConstants.LOG_WF_FAMILY, process.getFamily()) .addDetail(WorkflowsConstants.LOG_WF_PRIORITY, "" + |
|
229 |
* process.getPriority()) .addDetail(WorkflowsConstants.LOG_WF_PROCESS_ID, process.getId()) |
|
230 |
* .addDetail(WorkflowsConstants.LOG_WF_PROCESS_STATUS, process.getStatus().toString()) |
|
231 |
* .addDetail(WorkflowsConstants.LOG_WF_PROCESS_START_DATE, Long.toString(process.getStartDate())) |
|
232 |
* .addDetail(WorkflowsConstants.LOG_WF_PROCESS_END_DATE, Long.toString(process.getEndDate())) |
|
233 |
* .addDetail(WorkflowsConstants.LOG_WF_PROFILE_ID, process.isTemplate() ? null : process.getProfileId()) |
|
234 |
* .addDetail(WorkflowsConstants.LOG_WF_PROFILE_TEMPLATE_ID, process.isTemplate() ? process.getProfileId() : null) |
|
235 |
* .addDetail(WorkflowsConstants.LOG_WF_PARENT, process.getParentProfileId()) .addDetail(WorkflowsConstants.LOG_DATASOURCE_ID, |
|
236 |
* process.getDsId()) .addDetail(WorkflowsConstants.LOG_DATASOURCE_NAME, process.getDsName()) |
|
237 |
* .addDetail(WorkflowsConstants.LOG_DATASOURCE_INTERFACE, process.getDsInterface()) .addDetail(WorkflowsConstants.LOG_SYSTEM_ERROR, |
|
238 |
* process.getError()) .addDetail(WorkflowsConstants.LOG_SYSTEM_ERROR_STACKTRACE, process.getErrorStacktrace()) .flush(); |
|
239 |
* |
|
240 |
* emailDispatcher.sendMails(process); |
|
241 |
*/} |
|
242 |
|
|
184 | 243 |
} |
Also available in: Unified diff
wf execution