Project

General

Profile

1
package eu.dnetlib.msro.notification;
2

    
3
import java.io.StringReader;
4
import java.util.Map;
5
import java.util.Set;
6

    
7
import javax.annotation.Resource;
8

    
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Document;
12
import org.dom4j.DocumentException;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15

    
16
import com.google.common.base.Splitter;
17
import com.google.common.collect.Maps;
18
import com.google.common.collect.Sets;
19

    
20
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
25

    
26
public class WfDependencyLauncherNotificationHandler extends AbstractSubscriptionAction {
27

    
28
	@Resource
29
	private UniqueServiceLocator serviceLocator;
30

    
31
	@Resource
32
	private WorkflowExecutor workflowExecutor;
33

    
34
	@Resource
35
	private EmailDispatcher emailDispatcher;
36

    
37
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
38

    
39
	@Override
40
	public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
41

    
42
		final SAXReader reader = new SAXReader();
43
		try {
44
			final Document doc = reader.read(new StringReader(profile));
45

    
46
			final Set<String> emails = calculateEmails(rsId);
47
			final String procId = doc.valueOf("//LAST_EXECUTION_ID");
48
			final String wfName = doc.valueOf("//WORKFLOW_NAME");
49
			final boolean success = doc.valueOf("//LAST_EXECUTION_STATUS").equals("SUCCESS");
50
			final Map<String, String> responses = Maps.newHashMap();
51

    
52
			for (Object o : doc.selectNodes("//LAST_EXECUTION_OUTPUT")) {
53
				Node n = (Node) o;
54
				responses.put(n.valueOf("@name"), n.getText());
55
			}
56

    
57
			if (!success) {
58
				log.info("Last execution of " + rsId + " failed, dependencies NOT STARTED");
59
			}
60

    
61
			final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')//WORKFLOW[@id='" + rsId
62
					+ "']/WORKFLOW let $y := /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = $x/@id] "
63
					+ "where $y//CONFIGURATION/@start != 'disabled' return concat ($x/@id , ' @@@ ', $x/@name , ' @@@ ', $y//CONFIGURATION/@start)";
64

    
65
			try {
66
				final Map<String, String> pendingWfs = Maps.newHashMap();
67

    
68
				for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
69
					final String[] arr = s.split("@@@");
70
					final String id = arr[0].trim();
71
					final String name = arr[1].trim();
72
					final boolean manual = arr[2].trim().toLowerCase().equals("manual");
73
					if (success && !manual) {
74
						try {
75
							String pid = workflowExecutor.startProcess(id);
76
							log.info("PROC " + pid + " of WF " + id + " STARTED AS CHILD OF " + rsId);
77
						} catch (Exception e) {
78
							log.error("Error starting wf: " + id);
79
						}
80
					} else {
81
						pendingWfs.put(id, name);
82
					}
83
				}
84

    
85
				if (!emails.isEmpty()) {
86
					if (success) {
87
						emailDispatcher.sendSuccessMail(emails, rsId, procId, wfName, pendingWfs, responses);
88
					} else {
89
						final String error = doc.valueOf("//LAST_EXECUTION_ERROR");
90
						emailDispatcher.sendFailedMail(emails, rsId, procId, wfName, pendingWfs, responses, error);
91
					}
92
				}
93
			} catch (ISLookUpException e) {
94
				log.error("Error executing xquery: " + query, e);
95
			}
96
		} catch (DocumentException e) {
97
			log.error("Error parsing profile with id " + rsId + ": " + profile);
98
		}
99
	}
100

    
101
	private Set<String> calculateEmails(final String id) {
102
		final Set<String> list = Sets.newHashSet();
103
		try {
104
			for (String val : serviceLocator.getService(ISLookUpService.class).quickSearchProfile("//ADMIN_EMAIL[..//WORKFLOW/@id='" + id + "']/text()")) {
105
				for (String s : Splitter.on(",").trimResults().omitEmptyStrings().split(val)) {
106
					list.add(s);
107
				}
108
			}
109
		} catch (Exception e) {
110
			log.error("Error searching email adresses", e);
111
		}
112
		return list;
113
	}
114
}
(2-2/2)