Project

General

Profile

« Previous | Next » 

Revision 46885

Partial response of async methods

View differences:

MsroWorkerController.java
1 1
package eu.dnetlib.msro.controllers;
2 2

  
3
import java.io.IOException;
3 4
import java.util.Arrays;
5
import java.util.HashMap;
4 6
import java.util.List;
7
import java.util.Map;
8
import java.util.Set;
5 9
import java.util.stream.Collectors;
6 10

  
7 11
import org.apache.commons.lang3.StringUtils;
......
20 24
import eu.dnetlib.clients.msro.TerminatedWfDesc;
21 25
import eu.dnetlib.enabling.annotations.DnetService;
22 26
import eu.dnetlib.enabling.annotations.DnetServiceType;
23
import eu.dnetlib.msro.annotations.ProcessNode;
27
import eu.dnetlib.msro.exceptions.MSROException;
24 28
import eu.dnetlib.msro.workers.aggregation.collect.CollectorPluginEnumerator;
25 29
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
26 30
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
......
28 32
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType;
29 33
import eu.dnetlib.msro.workflows.ProcessStatus;
30 34
import eu.dnetlib.msro.workflows.WorkflowInstance;
31
import eu.dnetlib.msro.workflows.nodes.AbstractParallelProcessNode;
35
import eu.dnetlib.msro.workflows.procs.ProcessExecutor;
36
import eu.dnetlib.msro.workflows.procs.ProcessFactory;
32 37
import eu.dnetlib.msro.workflows.procs.ProcessRegistry;
33
import eu.dnetlib.msro.workflows.procs.WorkflowExecutor;
38
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
34 39
import eu.dnetlib.msro.workflows.util.NodeHelper;
35 40
import eu.dnetlib.services.BaseService;
36 41
import eu.dnetlib.services.ServiceRunningInstance;
37 42
import eu.dnetlib.services.ServiceStatus;
38 43
import eu.dnetlib.services.async.AsyncMethodException;
44
import eu.dnetlib.services.async.AsyncRunnable;
39 45
import eu.dnetlib.services.async.AsyncServerCallback;
40 46
import eu.dnetlib.services.async.HasAsyncMethods;
41 47

  
......
49 55
	private static final long MAX_NUMBER_OF_TERMINATED_WFS = 5;
50 56

  
51 57
	@Autowired
52
	private WorkflowExecutor workflowExecutor;
58
	private ProcessFactory processFactory;
53 59

  
54 60
	@Autowired
55 61
	private ProcessRegistry processRegistry;
56 62

  
57 63
	@Autowired
64
	private ProcessExecutor processExecutor;
65

  
66
	@Autowired
58 67
	private NodeHelper nodeHelper;
59 68

  
60 69
	@Autowired
61 70
	private CollectorPluginEnumerator collectorPlugins;
62 71

  
63 72
	@RequestMapping(value = "nodes", method = RequestMethod.GET)
64
	private List<String> nodes() {
65
		return nodeHelper.availableNodes().stream().map(AbstractParallelProcessNode::getNodeType).collect(Collectors.toList());
73
	private Set<String> nodes() {
74
		return nodeHelper.getValidTypes().keySet();
66 75
	}
67 76

  
68 77
	@RequestMapping(value = "collector/plugins", method = RequestMethod.GET)
......
72 81

  
73 82
	@RequestMapping(value = "pause", method = RequestMethod.GET)
74 83
	private ServiceRunningInstance pause() {
75
		workflowExecutor.setPaused(true);
84
		processExecutor.setPaused(true);
76 85
		return getStatus();
77 86
	}
78 87

  
79 88
	@RequestMapping(value = "resume", method = RequestMethod.GET)
80 89
	private ServiceRunningInstance resume() {
81
		workflowExecutor.setPaused(false);
90
		processExecutor.setPaused(false);
82 91
		return getStatus();
83 92
	}
84 93

  
......
87 96
		final Element nodes = DocumentHelper.createElement("WORKFLOW_NODES");
88 97
		final Element collPlugins = DocumentHelper.createElement("COLLECTOR_PLUGINS");
89 98

  
90
		nodeHelper.availableNodes().stream()
91
				.map(Object::getClass)
92
				.forEach(cl -> {
93
					final Element n = nodes.addElement("NODE");
94
					n.addAttribute("type", cl.getAnnotation(ProcessNode.class).value());
95
					n.addAttribute("class", cl.getName());
96
				});
99
		nodeHelper.getValidTypes().entrySet().forEach(e -> {
100
			final Element n = nodes.addElement("NODE");
101
			n.addAttribute("type", e.getKey());
102
			n.addAttribute("class", e.getValue().getName());
103
		});
97 104

  
98 105
		collectorPlugins.getAll().stream()
99 106
				.map(o -> o.getClass().getAnnotation(DnetCollectorPlugin.class))
......
120 127
		return MsroWorkerRunningInstance.newInstance(
121 128
				getProfileId(),
122 129
				getBaseUrl(),
123
				baseDir(), workflowExecutor.isPaused() ? ServiceStatus.PAUSED : ServiceStatus.ACTIVE,
130
				baseDir(), processExecutor.isPaused() ? ServiceStatus.PAUSED : ServiceStatus.ACTIVE,
124 131
				processRegistry.countRunningWfs(),
125 132
				processRegistry.countQueuedWfs(),
126 133
				processRegistry.listTerminated()
......
131 138
	}
132 139

  
133 140
	@Override
134
	public void processMethod(final String method, final String jsonParams, final AsyncServerCallback callback) throws AsyncMethodException {
141
	public AsyncRunnable prepareThread(final String method, final String jsonParams, final AsyncServerCallback callback) throws AsyncMethodException {
135 142
		final ObjectMapper mapper = new ObjectMapper();
136 143

  
137 144
		try {
138 145
			switch (method) {
139 146
			case "startWorkflow":
140
				workflowExecutor.startWorkflow(mapper.readValue(jsonParams, WorkflowInstance.class), callback);
141
				break;
147
				return new AsyncRunnable() {
148

  
149
					private WorkflowProcess proc;
150

  
151
					@Override
152
					public Map<String, String> prepare() throws AsyncMethodException {
153
						try {
154
							proc = processFactory.newProcess(mapper.readValue(jsonParams, WorkflowInstance.class), callback);
155

  
156
							final Map<String, String> map = new HashMap<>();
157
							map.put("procId", proc.getId());
158
							return map;
159
						} catch (final MSROException | IOException e) {
160
							throw new AsyncMethodException(e);
161
						}
162
					}
163

  
164
					@Override
165
					public void execute() throws AsyncMethodException {
166
						try {
167
							processExecutor.execute(proc);
168
						} catch (final MSROException e) {
169
							throw new AsyncMethodException(e);
170
						}
171
					}
172

  
173
				};
174

  
142 175
			default:
143 176
				log.warn("Invalid method: " + method);
144 177
				throw new AsyncMethodException("Invalid method: " + method);
......
147 180
			log.warn("Error executing method: " + method, e);
148 181
			throw new AsyncMethodException("Error executing method: " + method, e);
149 182
		}
183

  
150 184
	}
151 185

  
152 186
}

Also available in: Unified diff