Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.msro.notification;
2
3
import java.io.StringReader;
4
import java.util.Map;
5 27587 claudio.at
import java.util.Set;
6 26600 sandro.lab
7
import javax.annotation.Resource;
8
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Document;
12
import org.dom4j.DocumentException;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15
16
import com.google.common.base.Splitter;
17
import com.google.common.collect.Maps;
18 27587 claudio.at
import com.google.common.collect.Sets;
19 26600 sandro.lab
20
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23 32639 michele.ar
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24 33199 andrea.man
import eu.dnetlib.msro.workflows.metawf.WorkflowStartModeEnum;
25 26600 sandro.lab
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
26
27
public class WfDependencyLauncherNotificationHandler extends AbstractSubscriptionAction {
28
29 32639 michele.ar
	@Resource
30
	private UniqueServiceLocator serviceLocator;
31 26600 sandro.lab
32
	@Resource
33
	private WorkflowExecutor workflowExecutor;
34
35
	@Resource
36
	private EmailDispatcher emailDispatcher;
37
38
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
39
40
	@Override
41
	public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
42
43
		final SAXReader reader = new SAXReader();
44
		try {
45
			final Document doc = reader.read(new StringReader(profile));
46
47 27587 claudio.at
			final Set<String> emails = calculateEmails(rsId);
48 26600 sandro.lab
			final String procId = doc.valueOf("//LAST_EXECUTION_ID");
49
			final String wfName = doc.valueOf("//WORKFLOW_NAME");
50
			final boolean success = doc.valueOf("//LAST_EXECUTION_STATUS").equals("SUCCESS");
51
			final Map<String, String> responses = Maps.newHashMap();
52
53
			for (Object o : doc.selectNodes("//LAST_EXECUTION_OUTPUT")) {
54
				Node n = (Node) o;
55
				responses.put(n.valueOf("@name"), n.getText());
56
			}
57
58
			if (!success) {
59
				log.info("Last execution of " + rsId + " failed, dependencies NOT STARTED");
60
			}
61
62
			final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')//WORKFLOW[@id='" + rsId
63
					+ "']/WORKFLOW let $y := /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = $x/@id] "
64
					+ "where $y//CONFIGURATION/@start != 'disabled' return concat ($x/@id , ' @@@ ', $x/@name , ' @@@ ', $y//CONFIGURATION/@start)";
65
66
			try {
67
				final Map<String, String> pendingWfs = Maps.newHashMap();
68
69 32639 michele.ar
				for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
70 26600 sandro.lab
					final String[] arr = s.split("@@@");
71
					final String id = arr[0].trim();
72
					final String name = arr[1].trim();
73 33199 andrea.man
					final boolean manual = arr[2].trim().equalsIgnoreCase(WorkflowStartModeEnum.manual.toString());
74 26600 sandro.lab
					if (success && !manual) {
75
						try {
76
							String pid = workflowExecutor.startProcess(id);
77
							log.info("PROC " + pid + " of WF " + id + " STARTED AS CHILD OF " + rsId);
78
						} catch (Exception e) {
79
							log.error("Error starting wf: " + id);
80
						}
81
					} else {
82
						pendingWfs.put(id, name);
83
					}
84
				}
85
86
				if (!emails.isEmpty()) {
87
					if (success) {
88
						emailDispatcher.sendSuccessMail(emails, rsId, procId, wfName, pendingWfs, responses);
89
					} else {
90
						final String error = doc.valueOf("//LAST_EXECUTION_ERROR");
91
						emailDispatcher.sendFailedMail(emails, rsId, procId, wfName, pendingWfs, responses, error);
92
					}
93
				}
94
			} catch (ISLookUpException e) {
95
				log.error("Error executing xquery: " + query, e);
96
			}
97
		} catch (DocumentException e) {
98
			log.error("Error parsing profile with id " + rsId + ": " + profile);
99
		}
100
	}
101
102 27587 claudio.at
	private Set<String> calculateEmails(final String id) {
103
		final Set<String> list = Sets.newHashSet();
104 26600 sandro.lab
		try {
105 32639 michele.ar
			for (String val : serviceLocator.getService(ISLookUpService.class).quickSearchProfile("//ADMIN_EMAIL[..//WORKFLOW/@id='" + id + "']/text()")) {
106 26600 sandro.lab
				for (String s : Splitter.on(",").trimResults().omitEmptyStrings().split(val)) {
107
					list.add(s);
108
				}
109
			}
110
		} catch (Exception e) {
111
			log.error("Error searching email adresses", e);
112
		}
113
		return list;
114
	}
115
}