Project

General

Profile

1
package eu.dnetlib.msro.controllers;
2

    
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.Set;
9
import java.util.stream.Collectors;
10

    
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.DocumentHelper;
15
import org.dom4j.Element;
16
import org.springframework.beans.factory.annotation.Autowired;
17
import org.springframework.web.bind.annotation.RequestMapping;
18
import org.springframework.web.bind.annotation.RequestMethod;
19
import org.springframework.web.bind.annotation.RestController;
20

    
21
import com.fasterxml.jackson.databind.ObjectMapper;
22

    
23
import eu.dnetlib.clients.msro.MsroWorkerRunningInstance;
24
import eu.dnetlib.clients.msro.TerminatedWfDesc;
25
import eu.dnetlib.enabling.annotations.DnetService;
26
import eu.dnetlib.enabling.annotations.DnetServiceType;
27
import eu.dnetlib.msro.exceptions.MSROException;
28
import eu.dnetlib.msro.workers.aggregation.collect.CollectorPluginEnumerator;
29
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
30
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
31
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
32
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType;
33
import eu.dnetlib.msro.workflows.ProcessStatus;
34
import eu.dnetlib.msro.workflows.WorkflowInstance;
35
import eu.dnetlib.msro.workflows.procs.ProcessExecutor;
36
import eu.dnetlib.msro.workflows.procs.ProcessFactory;
37
import eu.dnetlib.msro.workflows.procs.ProcessRegistry;
38
import eu.dnetlib.msro.workflows.procs.WorkflowProcess;
39
import eu.dnetlib.msro.workflows.util.NodeHelper;
40
import eu.dnetlib.services.BaseService;
41
import eu.dnetlib.services.ServiceRunningInstance;
42
import eu.dnetlib.services.ServiceStatus;
43
import eu.dnetlib.services.async.AsyncMethodException;
44
import eu.dnetlib.services.async.AsyncRunnable;
45
import eu.dnetlib.services.async.AsyncServerCallback;
46
import eu.dnetlib.services.async.HasAsyncMethods;
47

    
48
@RestController
49
@RequestMapping("/worker")
50
@DnetService(DnetServiceType.msroWorker)
51
public class MsroWorkerController extends BaseService implements HasAsyncMethods {
52

    
53
	private static final Log log = LogFactory.getLog(MsroWorkerController.class);
54

    
55
	private static final long MAX_NUMBER_OF_TERMINATED_WFS = 5;
56

    
57
	@Autowired
58
	private ProcessFactory processFactory;
59

    
60
	@Autowired
61
	private ProcessRegistry processRegistry;
62

    
63
	@Autowired
64
	private ProcessExecutor processExecutor;
65

    
66
	@Autowired
67
	private NodeHelper nodeHelper;
68

    
69
	@Autowired
70
	private CollectorPluginEnumerator collectorPlugins;
71

    
72
	@RequestMapping(value = "nodes", method = RequestMethod.GET)
73
	private Set<String> nodes() {
74
		return nodeHelper.getValidTypes().keySet();
75
	}
76

    
77
	@RequestMapping(value = "collector/plugins", method = RequestMethod.GET)
78
	private List<String> plugins() {
79
		return collectorPlugins.getAll().stream().map(CollectorPlugin::getProtocol).sorted().collect(Collectors.toList());
80
	}
81

    
82
	@RequestMapping(value = "pause", method = RequestMethod.GET)
83
	private ServiceRunningInstance pause() {
84
		processExecutor.setPaused(true);
85
		return getStatus();
86
	}
87

    
88
	@RequestMapping(value = "resume", method = RequestMethod.GET)
89
	private ServiceRunningInstance resume() {
90
		processExecutor.setPaused(false);
91
		return getStatus();
92
	}
93

    
94
	@Override
95
	public List<Element> geXmlProfileSections() {
96
		final Element nodes = DocumentHelper.createElement("WORKFLOW_NODES");
97
		final Element collPlugins = DocumentHelper.createElement("COLLECTOR_PLUGINS");
98

    
99
		nodeHelper.getValidTypes().entrySet().forEach(e -> {
100
			final Element n = nodes.addElement("NODE");
101
			n.addAttribute("type", e.getKey());
102
			n.addAttribute("class", e.getValue().getName());
103
		});
104

    
105
		collectorPlugins.getAll().stream()
106
				.map(o -> o.getClass().getAnnotation(DnetCollectorPlugin.class))
107
				.forEach(ann -> {
108
					final Element plugin = collPlugins.addElement("PLUGIN");
109
					plugin.addAttribute("protocol", ann.value());
110
					for (final DnetCollectorParam ap : ann.parameters()) {
111
						final Element p = plugin.addElement("PARAM");
112
						p.addAttribute("name", ap.value());
113
						p.addAttribute("type", ap.type().toString());
114
						p.addAttribute("optional", Boolean.toString(ap.optional()));
115
						if (StringUtils.isNotBlank(ap.regex()) && (ap.type() == ProtocolParameterType.TEXT)) {
116
							p.addAttribute("regex", ap.regex());
117
						}
118
					}
119
				});
120

    
121
		return Arrays.asList(nodes, collPlugins);
122
	}
123

    
124
	@Override
125
	public ServiceRunningInstance getStatus() {
126

    
127
		return MsroWorkerRunningInstance.newInstance(
128
				getProfileId(),
129
				getBaseUrl(),
130
				baseDir(), processExecutor.isPaused() ? ServiceStatus.PAUSED : ServiceStatus.ACTIVE,
131
				processRegistry.countRunningWfs(),
132
				processRegistry.countQueuedWfs(),
133
				processRegistry.listTerminated()
134
						.stream()
135
						.limit(MAX_NUMBER_OF_TERMINATED_WFS)
136
						.map(wf -> new TerminatedWfDesc(wf.getId(), wf.getStatus() == ProcessStatus.SUCCESS, wf.getEndDate()))
137
						.collect(Collectors.toList()));
138
	}
139

    
140
	@Override
141
	public AsyncRunnable prepareThread(final String method, final String jsonParams, final AsyncServerCallback callback) throws AsyncMethodException {
142
		final ObjectMapper mapper = new ObjectMapper();
143

    
144
		try {
145
			switch (method) {
146
			case "startWorkflow":
147
				return new AsyncRunnable() {
148

    
149
					private WorkflowProcess proc;
150

    
151
					@Override
152
					public Map<String, String> prepare() throws AsyncMethodException {
153
						try {
154
							proc = processFactory.newProcess(mapper.readValue(jsonParams, WorkflowInstance.class), callback);
155

    
156
							final Map<String, String> map = new HashMap<>();
157
							map.put("procId", proc.getId());
158
							return map;
159
						} catch (final MSROException | IOException e) {
160
							throw new AsyncMethodException(e);
161
						}
162
					}
163

    
164
					@Override
165
					public void execute() throws AsyncMethodException {
166
						try {
167
							processExecutor.execute(proc);
168
						} catch (final MSROException e) {
169
							throw new AsyncMethodException(e);
170
						}
171
					}
172

    
173
				};
174

    
175
			default:
176
				log.warn("Invalid method: " + method);
177
				throw new AsyncMethodException("Invalid method: " + method);
178
			}
179
		} catch (final Throwable e) {
180
			log.warn("Error executing method: " + method, e);
181
			throw new AsyncMethodException("Error executing method: " + method, e);
182
		}
183

    
184
	}
185

    
186
}
    (1-1/1)