Revision 47472
Added by Michele Artini over 7 years ago
modules/dnet-springboot-apps/trunk/dnet-msro-application/src/main/java/eu/dnetlib/msro/MsroController.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 |
import java.io.StringReader; |
|
3 | 5 |
import java.util.HashMap; |
4 | 6 |
import java.util.List; |
5 | 7 |
import java.util.Map; |
6 | 8 |
|
7 | 9 |
import org.apache.commons.logging.Log; |
8 | 10 |
import org.apache.commons.logging.LogFactory; |
11 |
import org.dom4j.Document; |
|
12 |
import org.dom4j.DocumentException; |
|
13 |
import org.dom4j.Element; |
|
14 |
import org.dom4j.io.SAXReader; |
|
9 | 15 |
import org.springframework.beans.factory.annotation.Autowired; |
16 |
import org.springframework.web.bind.annotation.RequestBody; |
|
10 | 17 |
import org.springframework.web.bind.annotation.RequestMapping; |
18 |
import org.springframework.web.bind.annotation.RequestMethod; |
|
11 | 19 |
import org.springframework.web.bind.annotation.RequestParam; |
12 | 20 |
import org.springframework.web.bind.annotation.RestController; |
13 | 21 |
|
22 |
import com.fasterxml.jackson.core.type.TypeReference; |
|
14 | 23 |
import com.fasterxml.jackson.databind.ObjectMapper; |
15 | 24 |
|
25 |
import eu.dnetlib.clients.is.InformationServiceClient; |
|
16 | 26 |
import eu.dnetlib.clients.msro.Workflow; |
17 | 27 |
import eu.dnetlib.clients.msro.WorkflowTemplate; |
18 | 28 |
import eu.dnetlib.enabling.annotations.DnetService; |
19 | 29 |
import eu.dnetlib.enabling.annotations.DnetServiceType; |
30 |
import eu.dnetlib.exceptions.InformationServiceException; |
|
20 | 31 |
import eu.dnetlib.msro.exceptions.MSROException; |
32 |
import eu.dnetlib.msro.workflows.GraphNodeParameter; |
|
21 | 33 |
import eu.dnetlib.msro.workflows.WorkflowRef; |
34 |
import eu.dnetlib.msro.workflows.util.GraphLoader; |
|
22 | 35 |
import eu.dnetlib.msro.workflows.util.WorkflowDispatcher; |
23 | 36 |
import eu.dnetlib.msro.workflows.util.logs.WorkflowLog; |
24 | 37 |
import eu.dnetlib.msro.workflows.util.logs.WorkflowLogRepository; |
... | ... | |
41 | 54 |
@Autowired |
42 | 55 |
private WorkflowLogRepository wfLogRepository; |
43 | 56 |
|
57 |
@Autowired |
|
58 |
private GraphLoader graphLoader; |
|
59 |
|
|
60 |
@Autowired |
|
61 |
private InformationServiceClient isClient; |
|
62 |
|
|
44 | 63 |
@RequestMapping("startWorkflow") |
45 | 64 |
public WorkflowRef startWorkflow( |
46 | 65 |
@RequestParam final String wfId, |
... | ... | |
52 | 71 |
|
53 | 72 |
} |
54 | 73 |
|
74 |
@RequestMapping(value = "startWorkflowTemplate", method = RequestMethod.GET) |
|
75 |
public WorkflowRef startWorkflowTemplate(@RequestParam final String parent, @RequestParam final String node) throws MSROException { |
|
76 |
|
|
77 |
try { |
|
78 |
final Document docParent = (new SAXReader()).read(new StringReader(isClient.getProfile(parent))); |
|
79 |
final String family = docParent.valueOf("//WORKFLOW_FAMILY"); |
|
80 |
final String dsId = docParent.valueOf("//DATASOURCE/@id"); |
|
81 |
final String iface = docParent.valueOf("//DATASOURCE/@interface"); |
|
82 |
|
|
83 |
final String workerId = docParent.valueOf("//RESOURCE_MANAGER/@value"); |
|
84 |
|
|
85 |
final Map<String, String> globalParams = new HashMap<>(); |
|
86 |
for (final Object o : docParent.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) { |
|
87 |
final Element p = (Element) o; |
|
88 |
globalParams.put(p.valueOf("@name"), p.getTextTrim()); |
|
89 |
} |
|
90 |
final Map<String, GraphNodeParameter> mapParams = |
|
91 |
graphLoader.calculateParamsForNode(docParent.selectSingleNode("//NODE[@name='" + node + "']"), globalParams); |
|
92 |
|
|
93 |
final String wfTemplateId = mapParams.get("wfTemplateId").getValue(); |
|
94 |
final Map<String, String> params = new HashMap<>(); |
|
95 |
|
|
96 |
final Map<String, GraphNodeParameter> unresolved = |
|
97 |
(new ObjectMapper()).readValue(mapParams.get("wfTemplateParams").getValue(), new TypeReference<Map<String, GraphNodeParameter>>() {}); |
|
98 |
|
|
99 |
for (final Map.Entry<String, GraphNodeParameter> entry : unresolved.entrySet()) { |
|
100 |
final String k = entry.getKey(); |
|
101 |
if (entry.getValue().isEnvParam()) { |
|
102 |
log.error("Error launching a template containing ENV parmaters"); |
|
103 |
throw new MSROException("Error launching a template containing ENV parmaters"); |
|
104 |
} else { |
|
105 |
params.put(k, entry.getValue().getValue()); |
|
106 |
} |
|
107 |
|
|
108 |
} |
|
109 |
|
|
110 |
final WorkflowTemplate wfTemplate = new WorkflowTemplate(wfTemplateId, parent, node, family, dsId, iface, params, workerId); |
|
111 |
|
|
112 |
return startWorkflowTemplate(wfTemplate); |
|
113 |
} catch (final IOException | InformationServiceException | DocumentException e) { |
|
114 |
log.error("Error executing wfTemplate: " + node, e); |
|
115 |
throw new MSROException("Error executing wfTemplate: " + node, e); |
|
116 |
} |
|
117 |
|
|
118 |
} |
|
119 |
|
|
120 |
@RequestMapping(value = "startWorkflowTemplate", method = RequestMethod.POST) |
|
121 |
public WorkflowRef startWorkflowTemplate(@RequestBody final WorkflowTemplate wf) throws MSROException { |
|
122 |
return dispatcher.startWorkflowTemplate(wf, getBaseUrl(), null); |
|
123 |
} |
|
124 |
|
|
55 | 125 |
@RequestMapping("startRepoHiWorkflow") |
56 | 126 |
public WorkflowRef startRepoHiWorkflow( |
57 | 127 |
@RequestParam final String wfId, |
... | ... | |
67 | 137 |
} |
68 | 138 |
|
69 | 139 |
@RequestMapping("logs") |
70 |
public List<WorkflowLog> startWorkflow(@RequestParam final String wfId) {
|
|
140 |
public List<WorkflowLog> logs(@RequestParam final String wfId) {
|
|
71 | 141 |
return wfLogRepository.findByProfileId(wfId); |
72 | 142 |
} |
73 | 143 |
|
modules/dnet-springboot-apps/trunk/dnet-msro-application/src/main/java/eu/dnetlib/msro/workflows/util/WorkflowDispatcher.java | ||
---|---|---|
124 | 124 |
final Map<String, String> params = new HashMap<>(); |
125 | 125 |
graphLoader.calculateParamsForNode(templNode, globalParams).forEach((k, v) -> params.put(k, v.getValue())); |
126 | 126 |
|
127 |
return startWorkflowTemplate(repoByeTemplateId, null, name, "REPO_BYE", dsId, iface, params, localBaseUrl, workerId, res -> { |
|
127 |
final WorkflowTemplate wfTemplate = new WorkflowTemplate(repoByeTemplateId, null, name, "REPO_BYE", dsId, iface, params, workerId); |
|
128 |
|
|
129 |
return startWorkflowTemplate(wfTemplate, localBaseUrl, res -> { |
|
128 | 130 |
if (res.getStatus() == AsyncMethodStatus.SUCCESS) { |
129 | 131 |
log.info("RepoBye completed, I delete the wf profile: " + wfId); |
130 | 132 |
isClient.deleteProfile(wfId); |
... | ... | |
176 | 178 |
return wf; |
177 | 179 |
} |
178 | 180 |
|
179 |
public WorkflowRef startWorkflowTemplate(final WorkflowTemplate wf, |
|
181 |
public WorkflowRef startWorkflowTemplate(final WorkflowTemplate wfTemplate,
|
|
180 | 182 |
final String localBaseUrl, |
181 | 183 |
final AsyncServerCallback callback) throws MSROException { |
182 |
return startWorkflowTemplate(wf.getId(), wf.getParent(), wf.getName(), wf.getFamily(), wf.getDsId(), wf.getIface(), wf.getParams(), localBaseUrl, |
|
183 |
wf.getWorkerId(), callback); |
|
184 |
} |
|
185 | 184 |
|
186 |
public WorkflowRef startWorkflowTemplate(final String profileId, |
|
187 |
final String parent, |
|
188 |
final String name, |
|
189 |
final String family, |
|
190 |
final String dsId, |
|
191 |
final String iface, |
|
192 |
final Map<String, String> params, |
|
193 |
final String localBaseUrl, |
|
194 |
final String workerId, |
|
195 |
final AsyncServerCallback callback) throws MSROException { |
|
196 |
|
|
197 | 185 |
try { |
198 |
final String profile = isClient.getProfile(profileId);
|
|
186 |
final String profile = isClient.getProfile(wfTemplate.getId());
|
|
199 | 187 |
final Document doc = new SAXReader().read(new StringReader(profile)); |
200 | 188 |
|
201 | 189 |
final Map<String, String> globalParams = new HashMap<String, String>(); |
202 | 190 |
for (final Object o : doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) { |
203 | 191 |
final Element p = (Element) o; |
204 | 192 |
final String pname = p.valueOf("@name"); |
205 |
if (StringUtils.isNotBlank(params.get(pname))) {
|
|
206 |
globalParams.put(pname, params.get(pname));
|
|
193 |
if (StringUtils.isNotBlank((wfTemplate.getParams().get(pname)))) {
|
|
194 |
globalParams.put(pname, wfTemplate.getParams().get(pname));
|
|
207 | 195 |
} else if (p.selectSingleNode("@default") != null) { |
208 | 196 |
globalParams.put(pname, p.valueOf("@default")); |
209 | 197 |
} else if (!StringUtils.equalsIgnoreCase(p.valueOf("@required"), "true")) { |
... | ... | |
214 | 202 |
} |
215 | 203 |
|
216 | 204 |
final WorkflowInstance wf = new WorkflowInstance(); |
217 |
wf.setName(name);
|
|
218 |
wf.setFamily(family);
|
|
219 |
wf.setDsId(dsId);
|
|
220 |
wf.setDsInterface(iface);
|
|
205 |
wf.setName(wfTemplate.getName());
|
|
206 |
wf.setFamily(wfTemplate.getFamily());
|
|
207 |
wf.setDsId(wfTemplate.getDsId());
|
|
208 |
wf.setDsInterface(wfTemplate.getIface());
|
|
221 | 209 |
wf.setGraph(graphLoader.loadGraph(doc, globalParams)); |
222 | 210 |
wf.setPriority(DEFAULT_PRIORITY); |
223 |
wf.setProfileId(profileId);
|
|
211 |
wf.setProfileId(wfTemplate.getId());
|
|
224 | 212 |
wf.setTemplate(true); |
225 | 213 |
wf.setGlobalParams(globalParams); |
226 |
wf.setParent(parent);
|
|
227 |
wf.setWorkerId(workerId);
|
|
214 |
wf.setParent(wfTemplate.getParent());
|
|
215 |
wf.setWorkerId(wfTemplate.getWorkerId());
|
|
228 | 216 |
|
229 | 217 |
return executeWf(wf, localBaseUrl, callback); |
230 | 218 |
} catch (final Exception e) { |
231 |
log.error("Error starting workflow template: " + profileId, e);
|
|
219 |
log.error("Error starting workflow template: " + wfTemplate.getId(), e);
|
|
232 | 220 |
throw new MSROException("Error starting workflow template", e); |
233 | 221 |
} |
234 | 222 |
} |
modules/dnet-springboot-apps/trunk/dnet-administration-uis/src/main/java/eu/dnetlib/administration/uis/modules/workflows/WorkflowsModule.java | ||
---|---|---|
227 | 227 |
} |
228 | 228 |
|
229 | 229 |
@RequestMapping("startWorkflowTemplate") |
230 |
public @ResponseBody boolean startWorkflowTemplate(
|
|
230 |
public @ResponseBody WorkflowRef startWorkflowTemplate(
|
|
231 | 231 |
@RequestParam final String node, |
232 |
@RequestParam final String parentWf) throws Exception {
|
|
232 |
@RequestParam final String parent) throws Exception { |
|
233 | 233 |
|
234 |
// TODO |
|
235 |
|
|
236 |
/* |
|
237 |
* final String profile = isClient.getProfile(parentWf); final Document doc = (new SAXReader()).read(new StringReader(profile)); |
|
238 |
* final String family = doc.valueOf("//WORKFLOW_FAMILY"); final int priority = |
|
239 |
* NumberUtils.toInt(doc.valueOf("//WORKFLOW_PRIORITY"), WorkflowsConstants.DEFAULT_WF_PRIORITY); final String dsId = |
|
240 |
* doc.valueOf("//DATASOURCE/@id"); final String iface = doc.valueOf("//DATASOURCE/@interface"); |
|
241 |
* |
|
242 |
* final Map<String, String> globalParams = new HashMap<String, String>(); for (final Object o : |
|
243 |
* doc.selectNodes("//CONFIGURATION/PARAMETERS/PARAM")) { final Element p = (Element) o; globalParams.put(p.valueOf("@name"), |
|
244 |
* p.getTextTrim()); } |
|
245 |
* |
|
246 |
* final Node n = doc.selectSingleNode("//NODE[@name='" + node + "']"); final Map<String, Object> params = GraphNode.newNode(node, |
|
247 |
* n.valueOf("@type"), graphLoader.calculateParamsForNode(node, globalParams)) .resolveParamsWithNoEnv(); |
|
248 |
* |
|
249 |
* if (!params.containsKey("wfTemplateId") || !(params.get("wfTemplateId") instanceof String) || StringUtils.isBlank((String) |
|
250 |
* params.get("wfTemplateId"))) { log.error("wfTemplateId is invalid or missing in profile " + parentWf); throw new |
|
251 |
* DnetGenericException("wfTemplateId is invalid or missing in profile " + parentWf); } |
|
252 |
* |
|
253 |
* if (params.containsKey("wfTemplateParams") && !(params.get("wfTemplateParams") instanceof Map)) { log.error( |
|
254 |
* "wfTemplateParams is invalid in profile " + parentWf); throw new DnetGenericException("wfTemplateParams is invalid in profile " + |
|
255 |
* parentWf); } |
|
256 |
* |
|
257 |
* final String wfTtemplateId = (String) params.get("wfTemplateId"); |
|
258 |
* |
|
259 |
* @SuppressWarnings("unchecked") final Map<String, String> wfTtemplateParams = (Map<String, String>) |
|
260 |
* params.get("wfTemplateParams"); |
|
261 |
* |
|
262 |
* final MsroClient msro = clientFactory.getClient(MsroClient.class); |
|
263 |
* |
|
264 |
* msro.startWorkflowTemplate(wfTtemplateId, parentWf, node, family, wfTtemplateParams, dsId, iface, null); |
|
265 |
*/ |
|
266 |
return true; |
|
234 |
return clientFactory.getClient(MsroClient.class).startWorkflowTemplate(node, parent); |
|
267 | 235 |
} |
268 | 236 |
|
269 | 237 |
@RequestMapping(value = "proc/{procId}", method = RequestMethod.GET) |
modules/dnet-springboot-apps/trunk/dnet-administration-uis/src/main/resources/static/html/wf/wf-sub-workflows.html | ||
---|---|---|
1 |
<wf-process-modal proc-id="currentProcId" visible="showProcModal"></wf-process-modal> |
|
1 |
<wf-process-modal proc-id="currentProcId" visible="showProcModal" worker="currentWorker"></wf-process-modal>
|
|
2 | 2 |
|
3 | 3 |
<div class="form-group"> |
4 | 4 |
<label class="col-sm-3 control-label">Sub workflows</label> |
modules/dnet-springboot-apps/trunk/dnet-administration-uis/src/main/resources/static/js/wfs/wfs.js | ||
---|---|---|
162 | 162 |
|
163 | 163 |
var url = ''; |
164 | 164 |
if (isTemplate) { |
165 |
url = '/ajax/wfs/startWorkflowTemplate?node=' + name + '&parentWf=' + parent;
|
|
165 |
url = '/ajax/wfs/startWorkflowTemplate?node=' + name + '&parent=' + parent; |
|
166 | 166 |
} else { |
167 | 167 |
url = '/ajax/wfs/startWorkflow?wfId=' + wfId; |
168 | 168 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workflows/nodes/LaunchWorkflowTemplateJobNode.java | ||
---|---|---|
10 | 10 |
|
11 | 11 |
import eu.dnetlib.clients.locators.ServiceClientFactory; |
12 | 12 |
import eu.dnetlib.clients.msro.MsroClient; |
13 |
import eu.dnetlib.clients.msro.WorkflowTemplate; |
|
13 | 14 |
import eu.dnetlib.msro.annotations.ProcessNode; |
14 | 15 |
import eu.dnetlib.msro.controllers.MsroWorkerController; |
15 | 16 |
import eu.dnetlib.msro.workflows.Arc; |
... | ... | |
52 | 53 |
public final void execute(final Token token) { |
53 | 54 |
try { |
54 | 55 |
final String family = process.getFamily(); |
55 |
final int priority = process.getPriority(); |
|
56 | 56 |
final String dsId = process.getDsId(); |
57 | 57 |
final String iface = process.getDsInterface(); |
58 | 58 |
|
59 | 59 |
final MsroClient msro = serviceClientFactory.getClient(MsroClient.class); |
60 | 60 |
|
61 |
msro.startWorkflowTemplate(getWfTemplateId(), process.getProfileId(), getNodeName(), family, wfTemplateParams, worker.getBaseUrl(), |
|
62 |
worker.getProfileId(), |
|
61 |
final WorkflowTemplate wf = |
|
62 |
new WorkflowTemplate(getWfTemplateId(), process.getProfileId(), getNodeName(), family, dsId, iface, wfTemplateParams, |
|
63 |
worker.getProfileId()); |
|
64 |
|
|
65 |
msro.startWorkflowTemplate(wf, worker.getBaseUrl(), |
|
63 | 66 |
new AsyncClientCallback() { |
64 | 67 |
|
65 | 68 |
@Override |
modules/dnet-springboot-apps/trunk/dnet-common-utils/src/main/java/eu/dnetlib/clients/msro/MsroClient.java | ||
---|---|---|
3 | 3 |
import java.io.StringWriter; |
4 | 4 |
import java.util.Arrays; |
5 | 5 |
import java.util.List; |
6 |
import java.util.Map; |
|
7 | 6 |
|
8 | 7 |
import org.apache.commons.lang3.StringUtils; |
9 | 8 |
import org.springframework.beans.factory.annotation.Autowired; |
... | ... | |
125 | 124 |
asyncClientUtils.invokeRemoteMethod(getBaseUrl(), "startWorkflow", localBaseUrl, wf, callback); |
126 | 125 |
} |
127 | 126 |
|
128 |
public void startWorkflowTemplate(final String wfTemplateId, |
|
129 |
final String parent, |
|
130 |
final String name, |
|
131 |
final String family, |
|
132 |
final Map<String, String> params, |
|
133 |
final String localBaseUrl, |
|
134 |
final String workerId, |
|
135 |
final AsyncClientCallback callback) { |
|
136 |
final WorkflowTemplate wf = new WorkflowTemplate(wfTemplateId, parent, name, family, params, workerId); |
|
137 |
asyncClientUtils.invokeRemoteMethod(getBaseUrl(), "startWorkflow", localBaseUrl, wf, callback); |
|
138 |
} |
|
139 |
|
|
140 |
public void startWorkflowTemplate(final String wfTemplateId, |
|
141 |
final String parent, |
|
142 |
final String name, |
|
143 |
final String family, |
|
144 |
final Map<String, String> params, |
|
145 |
final String dsId, |
|
146 |
final String ifaceId, |
|
147 |
final String localBaseUrl, |
|
148 |
final String workerId, |
|
149 |
final AsyncClientCallback callback) { |
|
150 |
final WorkflowTemplate wf = new WorkflowTemplate(wfTemplateId, parent, name, family, dsId, ifaceId, params, workerId); |
|
151 |
asyncClientUtils.invokeRemoteMethod(getBaseUrl(), "startWorkflow", localBaseUrl, wf, callback); |
|
152 |
} |
|
153 |
|
|
154 | 127 |
public List<WfLogEntry> listWfLogs(final String wfId) { |
155 | 128 |
final RestTemplate restTemplate = new RestTemplate(); |
156 | 129 |
final String url = getBaseUrl() + "/logs?wfId=" + wfId; |
157 | 130 |
return Arrays.asList(restTemplate.getForObject(url, WfLogEntry[].class)); |
158 | 131 |
} |
159 | 132 |
|
133 |
public WorkflowRef startWorkflowTemplate(final String node, final String parent) { |
|
134 |
final StringWriter url = new StringWriter(); |
|
135 |
url.append(getBaseUrl()); |
|
136 |
url.append("/startWorkflowTemplate?parent="); |
|
137 |
url.append(parent); |
|
138 |
url.append("&node="); |
|
139 |
url.append(node); |
|
140 |
return (new RestTemplate()).getForObject(url.toString(), WorkflowRef.class); |
|
141 |
} |
|
142 |
|
|
143 |
public void startWorkflowTemplate(final WorkflowTemplate wf, final String localBaseUrl, final AsyncClientCallback callback) { |
|
144 |
asyncClientUtils.invokeRemoteMethod(getBaseUrl(), "startWorkflowTemplate", localBaseUrl, wf, callback); |
|
145 |
} |
|
146 |
|
|
160 | 147 |
} |
Also available in: Unified diff
wfTemplates execution