Revision 46999
Added by Michele Artini over 7 years ago
modules/dnet-springboot-apps/trunk/dnet-msro-application/src/main/java/eu/dnetlib/msro/controllers/MsroController.java | ||
---|---|---|
48 | 48 |
|
49 | 49 |
@Override |
50 | 50 |
public void execute() throws AsyncMethodException { |
51 |
final WorkflowRef ref; |
|
52 | 51 |
try { |
53 | 52 |
switch (method) { |
54 | 53 |
case "startWorkflow": |
55 |
dispatcher.startWorkflow(mapper.readValue(jsonParams, Workflow.class), getBaseUrl()); |
|
54 |
dispatcher.startWorkflow(mapper.readValue(jsonParams, Workflow.class), getBaseUrl(), callback);
|
|
56 | 55 |
break; |
57 | 56 |
case "startWorkflowTemplate": |
58 |
dispatcher.startWorkflowTemplate(mapper.readValue(jsonParams, WorkflowTemplate.class), getBaseUrl()); |
|
57 |
dispatcher.startWorkflowTemplate(mapper.readValue(jsonParams, WorkflowTemplate.class), getBaseUrl(), callback);
|
|
59 | 58 |
break; |
60 | 59 |
default: |
61 | 60 |
log.warn("Invalid method: " + method); |
... | ... | |
77 | 76 |
@RequestParam(required = false) final String dsId, |
78 | 77 |
@RequestParam(required = false) final String ifaceId) throws MSROException { |
79 | 78 |
|
80 |
return dispatcher.startWorkflow(wfId, parent, dsId, ifaceId, null); |
|
79 |
return dispatcher.startWorkflow(wfId, parent, dsId, ifaceId, null, null);
|
|
81 | 80 |
|
82 | 81 |
} |
83 | 82 |
|
modules/dnet-springboot-apps/trunk/dnet-msro-application/src/main/java/eu/dnetlib/msro/workflows/util/WorkflowDispatcher.java | ||
---|---|---|
33 | 33 |
import eu.dnetlib.msro.workflows.WorkflowRef; |
34 | 34 |
import eu.dnetlib.services.async.AsyncClientCallback; |
35 | 35 |
import eu.dnetlib.services.async.AsyncResponse; |
36 |
import eu.dnetlib.services.async.AsyncServerCallback; |
|
36 | 37 |
|
37 | 38 |
@Component |
38 | 39 |
public class WorkflowDispatcher { |
... | ... | |
50 | 51 |
|
51 | 52 |
public static final int DEFAULT_PRIORITY = 50; |
52 | 53 |
|
53 |
public WorkflowRef startWorkflow(final Workflow wf, final String localBaseUrl) throws MSROException { |
|
54 |
return startWorkflow(wf.getId(), wf.getParent(), wf.getDsId(), wf.getIface(), localBaseUrl); |
|
54 |
public WorkflowRef startWorkflow(final Workflow wf, final String localBaseUrl, final AsyncServerCallback callback) throws MSROException {
|
|
55 |
return startWorkflow(wf.getId(), wf.getParent(), wf.getDsId(), wf.getIface(), localBaseUrl, callback);
|
|
55 | 56 |
} |
56 | 57 |
|
57 | 58 |
public WorkflowRef startWorkflow(final String profileId, |
58 | 59 |
final String parent, |
59 | 60 |
final String dsId, |
60 | 61 |
final String iface, |
61 |
final String localBaseUrl) throws MSROException { |
|
62 |
final String localBaseUrl, |
|
63 |
final AsyncServerCallback callback) throws MSROException { |
|
62 | 64 |
try { |
63 | 65 |
final WorkflowInstance wf = newWorkflowInstance(profileId); |
64 | 66 |
|
... | ... | |
73 | 75 |
wf.setDsInterface(iface); |
74 | 76 |
} |
75 | 77 |
// end |
76 |
return executeWf(wf, localBaseUrl); |
|
78 |
return executeWf(wf, localBaseUrl, callback);
|
|
77 | 79 |
} catch (final Exception e) { |
78 | 80 |
log.error("Error parsing workflow: " + profileId, e); |
79 | 81 |
throw new MSROException("Error parsing workflow"); |
... | ... | |
112 | 114 |
} |
113 | 115 |
|
114 | 116 |
public WorkflowRef startWorkflowTemplate(final WorkflowTemplate wf, |
115 |
final String localBaseUrl) throws MSROException { |
|
116 |
return startWorkflowTemplate(wf.getId(), wf.getParent(), wf.getName(), wf.getFamily(), wf.getDsId(), wf.getIface(), wf.getParams(), localBaseUrl); |
|
117 |
final String localBaseUrl, |
|
118 |
final AsyncServerCallback callback) throws MSROException { |
|
119 |
return startWorkflowTemplate(wf.getId(), wf.getParent(), wf.getName(), wf.getFamily(), wf.getDsId(), wf.getIface(), wf.getParams(), localBaseUrl, |
|
120 |
callback); |
|
117 | 121 |
} |
118 | 122 |
|
119 | 123 |
public WorkflowRef startWorkflowTemplate(final String profileId, |
... | ... | |
123 | 127 |
final String dsId, |
124 | 128 |
final String iface, |
125 | 129 |
final Map<String, String> params, |
126 |
final String localBaseUrl) throws MSROException { |
|
130 |
final String localBaseUrl, |
|
131 |
final AsyncServerCallback callback) throws MSROException { |
|
127 | 132 |
|
128 | 133 |
try { |
129 | 134 |
final String profile = isClient.getProfile(profileId); |
... | ... | |
156 | 161 |
wf.setGlobalParams(globalParams); |
157 | 162 |
wf.setParent(parent); |
158 | 163 |
|
159 |
return executeWf(wf, localBaseUrl); |
|
164 |
return executeWf(wf, localBaseUrl, callback);
|
|
160 | 165 |
} catch (final Exception e) { |
161 | 166 |
log.error("Error starting workflow template: " + profileId, e); |
162 | 167 |
throw new MSROException("Error starting workflow template", e); |
163 | 168 |
} |
164 | 169 |
} |
165 | 170 |
|
166 |
private WorkflowRef executeWf(final WorkflowInstance wf, final String localBaseUrl) { |
|
171 |
private WorkflowRef executeWf(final WorkflowInstance wf, final String localBaseUrl, final AsyncServerCallback callback) {
|
|
167 | 172 |
|
168 | 173 |
// TODO Selezionare il worker in base alle caratterisiche del wf |
169 | 174 |
final MsroWorkerClient worker = clientFactory.getClient(MsroWorkerClient.class); |
... | ... | |
177 | 182 |
@Override |
178 | 183 |
public void onGoing(final AsyncResponse res) { |
179 | 184 |
log.info("ONGOING - " + res.getResponseJson()); |
185 |
|
|
186 |
if (callback != null) { |
|
187 |
callback.notify(res); |
|
188 |
} |
|
180 | 189 |
} |
181 | 190 |
|
182 | 191 |
@Override |
... | ... | |
184 | 193 |
log.info("FAILED - " + res.getResponseJson()); |
185 | 194 |
try { |
186 | 195 |
saveResponse(wf, false, mapper.readValue(res.getResponseJson(), new TypeReference<List<Pair<String, String>>>() {})); |
196 |
|
|
197 |
if (callback != null) { |
|
198 |
callback.notify(res); |
|
199 |
} |
|
187 | 200 |
} catch (final IOException e) { |
188 | 201 |
log.error("Error parsing async response: " + res.getResponseJson(), e); |
189 | 202 |
throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e); |
... | ... | |
195 | 208 |
log.info("DONE - " + res.getResponseJson()); |
196 | 209 |
try { |
197 | 210 |
saveResponse(wf, true, mapper.readValue(res.getResponseJson(), new TypeReference<List<Pair<String, String>>>() {})); |
211 |
|
|
212 |
if (callback != null) { |
|
213 |
callback.notify(res); |
|
214 |
} |
|
198 | 215 |
} catch (final IOException e) { |
199 | 216 |
log.error("Error parsing async response: " + res.getResponseJson(), e); |
200 | 217 |
throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e); |
modules/dnet-springboot-apps/trunk/dnet-administration-uis/src/main/resources/isStartupResources/profiles/workflows/workflow_test_03.xml | ||
---|---|---|
1 |
<RESOURCE_PROFILE> |
|
2 |
<HEADER> |
|
3 |
<RESOURCE_IDENTIFIER value="conf/workflow/d556b99f-bcb2-44c0-a52c-d75e2accf50d"/> |
|
4 |
<RESOURCE_TYPE value="workflow"/> |
|
5 |
<RESOURCE_KIND value="conf"/> |
|
6 |
<RESOURCE_URI value=""/> |
|
7 |
<DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/> |
|
8 |
</HEADER> |
|
9 |
<BODY> |
|
10 |
<WORKFLOW_NAME menuSection="Example workflows">Hello workflow with Join</WORKFLOW_NAME> |
|
11 |
<WORKFLOW_DESCRIPTION>This is a test workflow</WORKFLOW_DESCRIPTION> |
|
12 |
<WORKFLOW_INFO/> |
|
13 |
<WORKFLOW_FAMILY>TEST</WORKFLOW_FAMILY> |
|
14 |
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> |
|
15 |
|
|
16 |
<CONFIGURATION status="EXECUTABLE" start="AUTO"> |
|
17 |
<PARAMETERS/> |
|
18 |
<WORKFLOW> |
|
19 |
<NODE isStart="true" name="Hello1"> |
|
20 |
<DESCRIPTION/> |
|
21 |
<PARAMETERS/> |
|
22 |
<ARCS> |
|
23 |
<ARC to="HelloJoin"/> |
|
24 |
</ARCS> |
|
25 |
</NODE> |
|
26 |
<NODE isStart="true" name="Hello2"> |
|
27 |
<DESCRIPTION/> |
|
28 |
<PARAMETERS/> |
|
29 |
<ARCS> |
|
30 |
<ARC to="HelloJoin"/> |
|
31 |
</ARCS> |
|
32 |
</NODE> |
|
33 |
<NODE isStart="true" name="Hello3"> |
|
34 |
<DESCRIPTION/> |
|
35 |
<PARAMETERS/> |
|
36 |
<ARCS> |
|
37 |
<ARC to="HelloJoin"/> |
|
38 |
</ARCS> |
|
39 |
</NODE> |
|
40 |
<NODE name="HelloJoin" isJoin="true"> |
|
41 |
<DESCRIPTION/> |
|
42 |
<PARAMETERS/> |
|
43 |
<ARCS> |
|
44 |
<ARC to="HelloDone"/> |
|
45 |
</ARCS> |
|
46 |
</NODE> |
|
47 |
<NODE name="HelloDone"> |
|
48 |
<DESCRIPTION/> |
|
49 |
<PARAMETERS/> |
|
50 |
<ARCS> |
|
51 |
<ARC to="success"/> |
|
52 |
</ARCS> |
|
53 |
</NODE> |
|
54 |
|
|
55 |
</WORKFLOW> |
|
56 |
</CONFIGURATION> |
|
57 |
|
|
58 |
<NOTIFICATIONS/> |
|
59 |
|
|
60 |
<SCHEDULING enabled="false"> |
|
61 |
<CRON>9 9 9 ? * *</CRON> |
|
62 |
<MININTERVAL>10080</MININTERVAL> |
|
63 |
</SCHEDULING> |
|
64 |
<STATUS/> |
|
65 |
</BODY> |
|
66 |
</RESOURCE_PROFILE> |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workflows/procs/ProcessEngine.java | ||
---|---|---|
70 | 70 |
|
71 | 71 |
try { |
72 | 72 |
for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) { |
73 |
log.info("Executing node: " + node.getName());
|
|
73 |
log.info("Following arc: " + oldGraphNode.getName() + " -> " + node.getName());
|
|
74 | 74 |
|
75 | 75 |
if (node.isJoin() || node.isSucessNode()) { |
76 | 76 |
if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) { |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workflows/nodes/LaunchWorkflowJobNode.java | ||
---|---|---|
36 | 36 |
@Override |
37 | 37 |
public final void execute(final Token token) { |
38 | 38 |
|
39 |
final String nodeDesc = toString(); |
|
40 |
|
|
41 |
log.info("START NODE: " + nodeDesc); |
|
39 | 42 |
try { |
40 | 43 |
final MsroClient msro = serviceClientFactory.getClient(MsroClient.class); |
41 | 44 |
|
... | ... | |
49 | 52 |
@Override |
50 | 53 |
public void onFailed(final AsyncResponse res) { |
51 | 54 |
log.error("Child workflow is failed"); |
55 |
log.info("END NODE: " + nodeDesc); |
|
52 | 56 |
token.releaseAsFailed("Child workflow is failed"); |
53 | 57 |
} |
54 | 58 |
|
55 | 59 |
@Override |
56 | 60 |
public void onDone(final AsyncResponse res) { |
57 | 61 |
log.debug("Child workflow has been completed successfully"); |
62 |
log.info("END NODE: " + nodeDesc); |
|
58 | 63 |
token.release(Arc.DEFAULT_ARC); |
59 | 64 |
} |
60 | 65 |
}); |
Also available in: Unified diff
fixed a bug in async response