1 |
26600
|
sandro.lab
|
package eu.dnetlib.msro.notification;
|
2 |
|
|
|
3 |
|
|
import java.io.StringReader;
|
4 |
|
|
import java.util.Map;
|
5 |
27587
|
claudio.at
|
import java.util.Set;
|
6 |
26600
|
sandro.lab
|
|
7 |
|
|
import javax.annotation.Resource;
|
8 |
|
|
|
9 |
|
|
import org.apache.commons.logging.Log;
|
10 |
|
|
import org.apache.commons.logging.LogFactory;
|
11 |
|
|
import org.dom4j.Document;
|
12 |
|
|
import org.dom4j.DocumentException;
|
13 |
|
|
import org.dom4j.Node;
|
14 |
|
|
import org.dom4j.io.SAXReader;
|
15 |
|
|
|
16 |
|
|
import com.google.common.base.Splitter;
|
17 |
|
|
import com.google.common.collect.Maps;
|
18 |
27587
|
claudio.at
|
import com.google.common.collect.Sets;
|
19 |
26600
|
sandro.lab
|
|
20 |
|
|
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction;
|
21 |
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
|
22 |
|
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
23 |
|
|
import eu.dnetlib.enabling.tools.ServiceLocator;
|
24 |
|
|
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
|
25 |
|
|
|
26 |
|
|
public class WfDependencyLauncherNotificationHandler extends AbstractSubscriptionAction {
|
27 |
|
|
|
28 |
|
|
@Resource(name = "lookupLocator")
|
29 |
|
|
private ServiceLocator<ISLookUpService> lookupLocator;
|
30 |
|
|
|
31 |
|
|
@Resource
|
32 |
|
|
private WorkflowExecutor workflowExecutor;
|
33 |
|
|
|
34 |
|
|
@Resource
|
35 |
|
|
private EmailDispatcher emailDispatcher;
|
36 |
|
|
|
37 |
|
|
private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
|
38 |
|
|
|
39 |
|
|
@Override
|
40 |
|
|
public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
|
41 |
|
|
|
42 |
|
|
final SAXReader reader = new SAXReader();
|
43 |
|
|
try {
|
44 |
|
|
final Document doc = reader.read(new StringReader(profile));
|
45 |
|
|
|
46 |
27587
|
claudio.at
|
final Set<String> emails = calculateEmails(rsId);
|
47 |
26600
|
sandro.lab
|
final String procId = doc.valueOf("//LAST_EXECUTION_ID");
|
48 |
|
|
final String wfName = doc.valueOf("//WORKFLOW_NAME");
|
49 |
|
|
final boolean success = doc.valueOf("//LAST_EXECUTION_STATUS").equals("SUCCESS");
|
50 |
|
|
final Map<String, String> responses = Maps.newHashMap();
|
51 |
|
|
|
52 |
|
|
for (Object o : doc.selectNodes("//LAST_EXECUTION_OUTPUT")) {
|
53 |
|
|
Node n = (Node) o;
|
54 |
|
|
responses.put(n.valueOf("@name"), n.getText());
|
55 |
|
|
}
|
56 |
|
|
|
57 |
|
|
if (!success) {
|
58 |
|
|
log.info("Last execution of " + rsId + " failed, dependencies NOT STARTED");
|
59 |
|
|
}
|
60 |
|
|
|
61 |
|
|
final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')//WORKFLOW[@id='" + rsId
|
62 |
|
|
+ "']/WORKFLOW let $y := /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = $x/@id] "
|
63 |
|
|
+ "where $y//CONFIGURATION/@start != 'disabled' return concat ($x/@id , ' @@@ ', $x/@name , ' @@@ ', $y//CONFIGURATION/@start)";
|
64 |
|
|
|
65 |
|
|
try {
|
66 |
|
|
final Map<String, String> pendingWfs = Maps.newHashMap();
|
67 |
|
|
|
68 |
|
|
for (String s : lookupLocator.getService().quickSearchProfile(query)) {
|
69 |
|
|
final String[] arr = s.split("@@@");
|
70 |
|
|
final String id = arr[0].trim();
|
71 |
|
|
final String name = arr[1].trim();
|
72 |
|
|
final boolean manual = arr[2].trim().toLowerCase().equals("manual");
|
73 |
|
|
if (success && !manual) {
|
74 |
|
|
try {
|
75 |
|
|
String pid = workflowExecutor.startProcess(id);
|
76 |
|
|
log.info("PROC " + pid + " of WF " + id + " STARTED AS CHILD OF " + rsId);
|
77 |
|
|
} catch (Exception e) {
|
78 |
|
|
log.error("Error starting wf: " + id);
|
79 |
|
|
}
|
80 |
|
|
} else {
|
81 |
|
|
pendingWfs.put(id, name);
|
82 |
|
|
}
|
83 |
|
|
}
|
84 |
|
|
|
85 |
|
|
if (!emails.isEmpty()) {
|
86 |
|
|
if (success) {
|
87 |
|
|
emailDispatcher.sendSuccessMail(emails, rsId, procId, wfName, pendingWfs, responses);
|
88 |
|
|
} else {
|
89 |
|
|
final String error = doc.valueOf("//LAST_EXECUTION_ERROR");
|
90 |
|
|
emailDispatcher.sendFailedMail(emails, rsId, procId, wfName, pendingWfs, responses, error);
|
91 |
|
|
}
|
92 |
|
|
}
|
93 |
|
|
} catch (ISLookUpException e) {
|
94 |
|
|
log.error("Error executing xquery: " + query, e);
|
95 |
|
|
}
|
96 |
|
|
} catch (DocumentException e) {
|
97 |
|
|
log.error("Error parsing profile with id " + rsId + ": " + profile);
|
98 |
|
|
}
|
99 |
|
|
}
|
100 |
|
|
|
101 |
27587
|
claudio.at
|
private Set<String> calculateEmails(final String id) {
|
102 |
|
|
final Set<String> list = Sets.newHashSet();
|
103 |
26600
|
sandro.lab
|
try {
|
104 |
|
|
for (String val : lookupLocator.getService().quickSearchProfile("//ADMIN_EMAIL[..//WORKFLOW/@id='" + id + "']/text()")) {
|
105 |
|
|
for (String s : Splitter.on(",").trimResults().omitEmptyStrings().split(val)) {
|
106 |
|
|
list.add(s);
|
107 |
|
|
}
|
108 |
|
|
}
|
109 |
|
|
} catch (Exception e) {
|
110 |
|
|
log.error("Error searching email adresses", e);
|
111 |
|
|
}
|
112 |
|
|
return list;
|
113 |
|
|
}
|
114 |
|
|
}
|