Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.msro.notification;
2
3
import java.io.StringReader;
4
import java.util.Map;
5 27587 claudio.at
import java.util.Set;
6 26600 sandro.lab
7
import javax.annotation.Resource;
8
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Document;
12
import org.dom4j.DocumentException;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15
16
import com.google.common.base.Splitter;
17
import com.google.common.collect.Maps;
18 27587 claudio.at
import com.google.common.collect.Sets;
19 26600 sandro.lab
20
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.tools.ServiceLocator;
24
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
25
26
public class WfDependencyLauncherNotificationHandler extends AbstractSubscriptionAction {
27
28
	@Resource(name = "lookupLocator")
29
	private ServiceLocator<ISLookUpService> lookupLocator;
30
31
	@Resource
32
	private WorkflowExecutor workflowExecutor;
33
34
	@Resource
35
	private EmailDispatcher emailDispatcher;
36
37
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
38
39
	@Override
40
	public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
41
42
		final SAXReader reader = new SAXReader();
43
		try {
44
			final Document doc = reader.read(new StringReader(profile));
45
46 27587 claudio.at
			final Set<String> emails = calculateEmails(rsId);
47 26600 sandro.lab
			final String procId = doc.valueOf("//LAST_EXECUTION_ID");
48
			final String wfName = doc.valueOf("//WORKFLOW_NAME");
49
			final boolean success = doc.valueOf("//LAST_EXECUTION_STATUS").equals("SUCCESS");
50
			final Map<String, String> responses = Maps.newHashMap();
51
52
			for (Object o : doc.selectNodes("//LAST_EXECUTION_OUTPUT")) {
53
				Node n = (Node) o;
54
				responses.put(n.valueOf("@name"), n.getText());
55
			}
56
57
			if (!success) {
58
				log.info("Last execution of " + rsId + " failed, dependencies NOT STARTED");
59
			}
60
61
			final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')//WORKFLOW[@id='" + rsId
62
					+ "']/WORKFLOW let $y := /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = $x/@id] "
63
					+ "where $y//CONFIGURATION/@start != 'disabled' return concat ($x/@id , ' @@@ ', $x/@name , ' @@@ ', $y//CONFIGURATION/@start)";
64
65
			try {
66
				final Map<String, String> pendingWfs = Maps.newHashMap();
67
68
				for (String s : lookupLocator.getService().quickSearchProfile(query)) {
69
					final String[] arr = s.split("@@@");
70
					final String id = arr[0].trim();
71
					final String name = arr[1].trim();
72
					final boolean manual = arr[2].trim().toLowerCase().equals("manual");
73
					if (success && !manual) {
74
						try {
75
							String pid = workflowExecutor.startProcess(id);
76
							log.info("PROC " + pid + " of WF " + id + " STARTED AS CHILD OF " + rsId);
77
						} catch (Exception e) {
78
							log.error("Error starting wf: " + id);
79
						}
80
					} else {
81
						pendingWfs.put(id, name);
82
					}
83
				}
84
85
				if (!emails.isEmpty()) {
86
					if (success) {
87
						emailDispatcher.sendSuccessMail(emails, rsId, procId, wfName, pendingWfs, responses);
88
					} else {
89
						final String error = doc.valueOf("//LAST_EXECUTION_ERROR");
90
						emailDispatcher.sendFailedMail(emails, rsId, procId, wfName, pendingWfs, responses, error);
91
					}
92
				}
93
			} catch (ISLookUpException e) {
94
				log.error("Error executing xquery: " + query, e);
95
			}
96
		} catch (DocumentException e) {
97
			log.error("Error parsing profile with id " + rsId + ": " + profile);
98
		}
99
	}
100
101 27587 claudio.at
	private Set<String> calculateEmails(final String id) {
102
		final Set<String> list = Sets.newHashSet();
103 26600 sandro.lab
		try {
104
			for (String val : lookupLocator.getService().quickSearchProfile("//ADMIN_EMAIL[..//WORKFLOW/@id='" + id + "']/text()")) {
105
				for (String s : Splitter.on(",").trimResults().omitEmptyStrings().split(val)) {
106
					list.add(s);
107
				}
108
			}
109
		} catch (Exception e) {
110
			log.error("Error searching email adresses", e);
111
		}
112
		return list;
113
	}
114
}