Project

General

Profile

« Previous | Next » 

Revision 46999

fixed a bug in async response

View differences:

modules/dnet-springboot-apps/trunk/dnet-msro-application/src/main/java/eu/dnetlib/msro/controllers/MsroController.java
48 48

  
49 49
			@Override
50 50
			public void execute() throws AsyncMethodException {
51
				final WorkflowRef ref;
52 51
				try {
53 52
					switch (method) {
54 53
					case "startWorkflow":
55
						dispatcher.startWorkflow(mapper.readValue(jsonParams, Workflow.class), getBaseUrl());
54
						dispatcher.startWorkflow(mapper.readValue(jsonParams, Workflow.class), getBaseUrl(), callback);
56 55
						break;
57 56
					case "startWorkflowTemplate":
58
						dispatcher.startWorkflowTemplate(mapper.readValue(jsonParams, WorkflowTemplate.class), getBaseUrl());
57
						dispatcher.startWorkflowTemplate(mapper.readValue(jsonParams, WorkflowTemplate.class), getBaseUrl(), callback);
59 58
						break;
60 59
					default:
61 60
						log.warn("Invalid method: " + method);
......
77 76
			@RequestParam(required = false) final String dsId,
78 77
			@RequestParam(required = false) final String ifaceId) throws MSROException {
79 78

  
80
		return dispatcher.startWorkflow(wfId, parent, dsId, ifaceId, null);
79
		return dispatcher.startWorkflow(wfId, parent, dsId, ifaceId, null, null);
81 80

  
82 81
	}
83 82

  
modules/dnet-springboot-apps/trunk/dnet-msro-application/src/main/java/eu/dnetlib/msro/workflows/util/WorkflowDispatcher.java
33 33
import eu.dnetlib.msro.workflows.WorkflowRef;
34 34
import eu.dnetlib.services.async.AsyncClientCallback;
35 35
import eu.dnetlib.services.async.AsyncResponse;
36
import eu.dnetlib.services.async.AsyncServerCallback;
36 37

  
37 38
@Component
38 39
public class WorkflowDispatcher {
......
50 51

  
51 52
	public static final int DEFAULT_PRIORITY = 50;
52 53

  
53
	public WorkflowRef startWorkflow(final Workflow wf, final String localBaseUrl) throws MSROException {
54
		return startWorkflow(wf.getId(), wf.getParent(), wf.getDsId(), wf.getIface(), localBaseUrl);
54
	public WorkflowRef startWorkflow(final Workflow wf, final String localBaseUrl, final AsyncServerCallback callback) throws MSROException {
55
		return startWorkflow(wf.getId(), wf.getParent(), wf.getDsId(), wf.getIface(), localBaseUrl, callback);
55 56
	}
56 57

  
57 58
	public WorkflowRef startWorkflow(final String profileId,
58 59
			final String parent,
59 60
			final String dsId,
60 61
			final String iface,
61
			final String localBaseUrl) throws MSROException {
62
			final String localBaseUrl,
63
			final AsyncServerCallback callback) throws MSROException {
62 64
		try {
63 65
			final WorkflowInstance wf = newWorkflowInstance(profileId);
64 66

  
......
73 75
				wf.setDsInterface(iface);
74 76
			}
75 77
			// end
76
			return executeWf(wf, localBaseUrl);
78
			return executeWf(wf, localBaseUrl, callback);
77 79
		} catch (final Exception e) {
78 80
			log.error("Error parsing workflow: " + profileId, e);
79 81
			throw new MSROException("Error parsing workflow");
......
112 114
	}
113 115

  
114 116
	public WorkflowRef startWorkflowTemplate(final WorkflowTemplate wf,
115
			final String localBaseUrl) throws MSROException {
116
		return startWorkflowTemplate(wf.getId(), wf.getParent(), wf.getName(), wf.getFamily(), wf.getDsId(), wf.getIface(), wf.getParams(), localBaseUrl);
117
			final String localBaseUrl,
118
			final AsyncServerCallback callback) throws MSROException {
119
		return startWorkflowTemplate(wf.getId(), wf.getParent(), wf.getName(), wf.getFamily(), wf.getDsId(), wf.getIface(), wf.getParams(), localBaseUrl,
120
				callback);
117 121
	}
118 122

  
119 123
	public WorkflowRef startWorkflowTemplate(final String profileId,
......
123 127
			final String dsId,
124 128
			final String iface,
125 129
			final Map<String, String> params,
126
			final String localBaseUrl) throws MSROException {
130
			final String localBaseUrl,
131
			final AsyncServerCallback callback) throws MSROException {
127 132

  
128 133
		try {
129 134
			final String profile = isClient.getProfile(profileId);
......
156 161
			wf.setGlobalParams(globalParams);
157 162
			wf.setParent(parent);
158 163

  
159
			return executeWf(wf, localBaseUrl);
164
			return executeWf(wf, localBaseUrl, callback);
160 165
		} catch (final Exception e) {
161 166
			log.error("Error starting workflow template: " + profileId, e);
162 167
			throw new MSROException("Error starting workflow template", e);
163 168
		}
164 169
	}
165 170

  
166
	private WorkflowRef executeWf(final WorkflowInstance wf, final String localBaseUrl) {
171
	private WorkflowRef executeWf(final WorkflowInstance wf, final String localBaseUrl, final AsyncServerCallback callback) {
167 172

  
168 173
		// TODO Selezionare il worker in base alle caratterisiche del wf
169 174
		final MsroWorkerClient worker = clientFactory.getClient(MsroWorkerClient.class);
......
177 182
			@Override
178 183
			public void onGoing(final AsyncResponse res) {
179 184
				log.info("ONGOING - " + res.getResponseJson());
185

  
186
				if (callback != null) {
187
					callback.notify(res);
188
				}
180 189
			}
181 190

  
182 191
			@Override
......
184 193
				log.info("FAILED - " + res.getResponseJson());
185 194
				try {
186 195
					saveResponse(wf, false, mapper.readValue(res.getResponseJson(), new TypeReference<List<Pair<String, String>>>() {}));
196

  
197
					if (callback != null) {
198
						callback.notify(res);
199
					}
187 200
				} catch (final IOException e) {
188 201
					log.error("Error parsing async response: " + res.getResponseJson(), e);
189 202
					throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e);
......
195 208
				log.info("DONE - " + res.getResponseJson());
196 209
				try {
197 210
					saveResponse(wf, true, mapper.readValue(res.getResponseJson(), new TypeReference<List<Pair<String, String>>>() {}));
211

  
212
					if (callback != null) {
213
						callback.notify(res);
214
					}
198 215
				} catch (final IOException e) {
199 216
					log.error("Error parsing async response: " + res.getResponseJson(), e);
200 217
					throw new DnetGenericRuntimeException("Error final parsing async response: " + res.getResponseJson(), e);
modules/dnet-springboot-apps/trunk/dnet-administration-uis/src/main/resources/isStartupResources/profiles/workflows/workflow_test_03.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="conf/workflow/d556b99f-bcb2-44c0-a52c-d75e2accf50d"/>
4
        <RESOURCE_TYPE value="workflow"/>
5
        <RESOURCE_KIND value="conf"/>
6
        <RESOURCE_URI value=""/>
7
        <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/>
8
    </HEADER>
9
    <BODY>
10
        <WORKFLOW_NAME menuSection="Example workflows">Hello workflow with Join</WORKFLOW_NAME>
11
        <WORKFLOW_DESCRIPTION>This is a test workflow</WORKFLOW_DESCRIPTION>
12
        <WORKFLOW_INFO/>
13
        <WORKFLOW_FAMILY>TEST</WORKFLOW_FAMILY>
14
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
15
        
16
        <CONFIGURATION status="EXECUTABLE" start="AUTO">
17
            <PARAMETERS/>
18
            <WORKFLOW>
19
                <NODE isStart="true" name="Hello1">
20
                    <DESCRIPTION/>
21
                    <PARAMETERS/>
22
                    <ARCS>
23
                        <ARC to="HelloJoin"/>
24
                    </ARCS>
25
                </NODE>
26
                <NODE isStart="true" name="Hello2">
27
                    <DESCRIPTION/>
28
                    <PARAMETERS/>
29
                    <ARCS>
30
                        <ARC to="HelloJoin"/>
31
                    </ARCS>
32
                </NODE>
33
                <NODE isStart="true" name="Hello3">
34
                    <DESCRIPTION/>
35
                    <PARAMETERS/>
36
                    <ARCS>
37
                        <ARC to="HelloJoin"/>
38
                    </ARCS>
39
                </NODE>
40
                <NODE name="HelloJoin" isJoin="true">
41
                    <DESCRIPTION/>
42
                    <PARAMETERS/>
43
                    <ARCS>
44
                        <ARC to="HelloDone"/>
45
                    </ARCS>
46
                </NODE>
47
                <NODE name="HelloDone">
48
                    <DESCRIPTION/>
49
                    <PARAMETERS/>
50
                    <ARCS>
51
                        <ARC to="success"/>
52
                    </ARCS>
53
                </NODE>
54
                
55
            </WORKFLOW>
56
        </CONFIGURATION>
57
        
58
        <NOTIFICATIONS/>
59
        
60
        <SCHEDULING enabled="false">
61
            <CRON>9 9 9 ? * *</CRON>
62
            <MININTERVAL>10080</MININTERVAL>
63
        </SCHEDULING>
64
        <STATUS/>
65
    </BODY>
66
</RESOURCE_PROFILE>
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workflows/procs/ProcessEngine.java
70 70

  
71 71
		try {
72 72
			for (final GraphNode node : process.getGraph().nextNodes(oldGraphNode, oldToken.getNextArc())) {
73
				log.info("Executing node: " + node.getName());
73
				log.info("Following arc: " + oldGraphNode.getName() + " -> " + node.getName());
74 74

  
75 75
				if (node.isJoin() || node.isSucessNode()) {
76 76
					if (!process.getPausedJoinNodeTokens().containsKey(node.getName())) {
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workflows/nodes/LaunchWorkflowJobNode.java
36 36
	@Override
37 37
	public final void execute(final Token token) {
38 38

  
39
		final String nodeDesc = toString();
40

  
41
		log.info("START NODE: " + nodeDesc);
39 42
		try {
40 43
			final MsroClient msro = serviceClientFactory.getClient(MsroClient.class);
41 44

  
......
49 52
				@Override
50 53
				public void onFailed(final AsyncResponse res) {
51 54
					log.error("Child workflow is failed");
55
					log.info("END NODE: " + nodeDesc);
52 56
					token.releaseAsFailed("Child workflow is failed");
53 57
				}
54 58

  
55 59
				@Override
56 60
				public void onDone(final AsyncResponse res) {
57 61
					log.debug("Child workflow has been completed successfully");
62
					log.info("END NODE: " + nodeDesc);
58 63
					token.release(Arc.DEFAULT_ARC);
59 64
				}
60 65
			});

Also available in: Unified diff