Revision 46885
Added by Michele Artini about 7 years ago
MsroWorkerController.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.controllers; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
3 | 4 |
import java.util.Arrays; |
5 |
import java.util.HashMap; |
|
4 | 6 |
import java.util.List; |
7 |
import java.util.Map; |
|
8 |
import java.util.Set; |
|
5 | 9 |
import java.util.stream.Collectors; |
6 | 10 |
|
7 | 11 |
import org.apache.commons.lang3.StringUtils; |
... | ... | |
20 | 24 |
import eu.dnetlib.clients.msro.TerminatedWfDesc; |
21 | 25 |
import eu.dnetlib.enabling.annotations.DnetService; |
22 | 26 |
import eu.dnetlib.enabling.annotations.DnetServiceType; |
23 |
import eu.dnetlib.msro.annotations.ProcessNode;
|
|
27 |
import eu.dnetlib.msro.exceptions.MSROException;
|
|
24 | 28 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectorPluginEnumerator; |
25 | 29 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
26 | 30 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam; |
... | ... | |
28 | 32 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType; |
29 | 33 |
import eu.dnetlib.msro.workflows.ProcessStatus; |
30 | 34 |
import eu.dnetlib.msro.workflows.WorkflowInstance; |
31 |
import eu.dnetlib.msro.workflows.nodes.AbstractParallelProcessNode; |
|
35 |
import eu.dnetlib.msro.workflows.procs.ProcessExecutor; |
|
36 |
import eu.dnetlib.msro.workflows.procs.ProcessFactory; |
|
32 | 37 |
import eu.dnetlib.msro.workflows.procs.ProcessRegistry; |
33 |
import eu.dnetlib.msro.workflows.procs.WorkflowExecutor;
|
|
38 |
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
|
|
34 | 39 |
import eu.dnetlib.msro.workflows.util.NodeHelper; |
35 | 40 |
import eu.dnetlib.services.BaseService; |
36 | 41 |
import eu.dnetlib.services.ServiceRunningInstance; |
37 | 42 |
import eu.dnetlib.services.ServiceStatus; |
38 | 43 |
import eu.dnetlib.services.async.AsyncMethodException; |
44 |
import eu.dnetlib.services.async.AsyncRunnable; |
|
39 | 45 |
import eu.dnetlib.services.async.AsyncServerCallback; |
40 | 46 |
import eu.dnetlib.services.async.HasAsyncMethods; |
41 | 47 |
|
... | ... | |
49 | 55 |
private static final long MAX_NUMBER_OF_TERMINATED_WFS = 5; |
50 | 56 |
|
51 | 57 |
@Autowired |
52 |
private WorkflowExecutor workflowExecutor;
|
|
58 |
private ProcessFactory processFactory;
|
|
53 | 59 |
|
54 | 60 |
@Autowired |
55 | 61 |
private ProcessRegistry processRegistry; |
56 | 62 |
|
57 | 63 |
@Autowired |
64 |
private ProcessExecutor processExecutor; |
|
65 |
|
|
66 |
@Autowired |
|
58 | 67 |
private NodeHelper nodeHelper; |
59 | 68 |
|
60 | 69 |
@Autowired |
61 | 70 |
private CollectorPluginEnumerator collectorPlugins; |
62 | 71 |
|
63 | 72 |
@RequestMapping(value = "nodes", method = RequestMethod.GET) |
64 |
private List<String> nodes() {
|
|
65 |
return nodeHelper.availableNodes().stream().map(AbstractParallelProcessNode::getNodeType).collect(Collectors.toList());
|
|
73 |
private Set<String> nodes() {
|
|
74 |
return nodeHelper.getValidTypes().keySet();
|
|
66 | 75 |
} |
67 | 76 |
|
68 | 77 |
@RequestMapping(value = "collector/plugins", method = RequestMethod.GET) |
... | ... | |
72 | 81 |
|
73 | 82 |
@RequestMapping(value = "pause", method = RequestMethod.GET) |
74 | 83 |
private ServiceRunningInstance pause() { |
75 |
workflowExecutor.setPaused(true);
|
|
84 |
processExecutor.setPaused(true);
|
|
76 | 85 |
return getStatus(); |
77 | 86 |
} |
78 | 87 |
|
79 | 88 |
@RequestMapping(value = "resume", method = RequestMethod.GET) |
80 | 89 |
private ServiceRunningInstance resume() { |
81 |
workflowExecutor.setPaused(false);
|
|
90 |
processExecutor.setPaused(false);
|
|
82 | 91 |
return getStatus(); |
83 | 92 |
} |
84 | 93 |
|
... | ... | |
87 | 96 |
final Element nodes = DocumentHelper.createElement("WORKFLOW_NODES"); |
88 | 97 |
final Element collPlugins = DocumentHelper.createElement("COLLECTOR_PLUGINS"); |
89 | 98 |
|
90 |
nodeHelper.availableNodes().stream() |
|
91 |
.map(Object::getClass) |
|
92 |
.forEach(cl -> { |
|
93 |
final Element n = nodes.addElement("NODE"); |
|
94 |
n.addAttribute("type", cl.getAnnotation(ProcessNode.class).value()); |
|
95 |
n.addAttribute("class", cl.getName()); |
|
96 |
}); |
|
99 |
nodeHelper.getValidTypes().entrySet().forEach(e -> { |
|
100 |
final Element n = nodes.addElement("NODE"); |
|
101 |
n.addAttribute("type", e.getKey()); |
|
102 |
n.addAttribute("class", e.getValue().getName()); |
|
103 |
}); |
|
97 | 104 |
|
98 | 105 |
collectorPlugins.getAll().stream() |
99 | 106 |
.map(o -> o.getClass().getAnnotation(DnetCollectorPlugin.class)) |
... | ... | |
120 | 127 |
return MsroWorkerRunningInstance.newInstance( |
121 | 128 |
getProfileId(), |
122 | 129 |
getBaseUrl(), |
123 |
baseDir(), workflowExecutor.isPaused() ? ServiceStatus.PAUSED : ServiceStatus.ACTIVE,
|
|
130 |
baseDir(), processExecutor.isPaused() ? ServiceStatus.PAUSED : ServiceStatus.ACTIVE,
|
|
124 | 131 |
processRegistry.countRunningWfs(), |
125 | 132 |
processRegistry.countQueuedWfs(), |
126 | 133 |
processRegistry.listTerminated() |
... | ... | |
131 | 138 |
} |
132 | 139 |
|
133 | 140 |
@Override |
134 |
public void processMethod(final String method, final String jsonParams, final AsyncServerCallback callback) throws AsyncMethodException {
|
|
141 |
public AsyncRunnable prepareThread(final String method, final String jsonParams, final AsyncServerCallback callback) throws AsyncMethodException {
|
|
135 | 142 |
final ObjectMapper mapper = new ObjectMapper(); |
136 | 143 |
|
137 | 144 |
try { |
138 | 145 |
switch (method) { |
139 | 146 |
case "startWorkflow": |
140 |
workflowExecutor.startWorkflow(mapper.readValue(jsonParams, WorkflowInstance.class), callback); |
|
141 |
break; |
|
147 |
return new AsyncRunnable() { |
|
148 |
|
|
149 |
private WorkflowProcess proc; |
|
150 |
|
|
151 |
@Override |
|
152 |
public Map<String, String> prepare() throws AsyncMethodException { |
|
153 |
try { |
|
154 |
proc = processFactory.newProcess(mapper.readValue(jsonParams, WorkflowInstance.class), callback); |
|
155 |
|
|
156 |
final Map<String, String> map = new HashMap<>(); |
|
157 |
map.put("procId", proc.getId()); |
|
158 |
return map; |
|
159 |
} catch (final MSROException | IOException e) { |
|
160 |
throw new AsyncMethodException(e); |
|
161 |
} |
|
162 |
} |
|
163 |
|
|
164 |
@Override |
|
165 |
public void execute() throws AsyncMethodException { |
|
166 |
try { |
|
167 |
processExecutor.execute(proc); |
|
168 |
} catch (final MSROException e) { |
|
169 |
throw new AsyncMethodException(e); |
|
170 |
} |
|
171 |
} |
|
172 |
|
|
173 |
}; |
|
174 |
|
|
142 | 175 |
default: |
143 | 176 |
log.warn("Invalid method: " + method); |
144 | 177 |
throw new AsyncMethodException("Invalid method: " + method); |
... | ... | |
147 | 180 |
log.warn("Error executing method: " + method, e); |
148 | 181 |
throw new AsyncMethodException("Error executing method: " + method, e); |
149 | 182 |
} |
183 |
|
|
150 | 184 |
} |
151 | 185 |
|
152 | 186 |
} |
Also available in: Unified diff
Partial response of async methods