Revision 32748
Added by Andrea Mannocci about 10 years ago
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/deploy.info | ||
---|---|---|
1 |
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-msro-service/trunk/", "deploy_repository": "dnet4-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", "name": "dnet-msro-service"} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/test/java/eu/dnetlib/AppTest.java | ||
---|---|---|
1 |
package eu.dnetlib; |
|
2 |
|
|
3 |
import java.io.StringWriter; |
|
4 |
|
|
5 |
import org.junit.Test; |
|
6 |
import org.w3c.dom.Node; |
|
7 |
import org.w3c.dom.NodeList; |
|
8 |
|
|
9 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
|
10 |
|
|
11 |
/** |
|
12 |
* Unit test for simple App. |
|
13 |
*/ |
|
14 |
public class AppTest { |
|
15 |
|
|
16 |
@Test |
|
17 |
public void testXslt() { |
|
18 |
String res = (new ApplyXslt(getXslt())).evaluate(getXml()); |
|
19 |
System.out.println("******************"); |
|
20 |
System.out.println(res); |
|
21 |
System.out.println("******************"); |
|
22 |
} |
|
23 |
|
|
24 |
|
|
25 |
public static String javaMethod(NodeList list) { |
|
26 |
System.out.println("******************"); |
|
27 |
System.out.println("TYPE : " + list.toString()); |
|
28 |
System.out.println("LENGTH : " + list.getLength()); |
|
29 |
for (int i = 0; i<list.getLength(); i++) { |
|
30 |
Node node = list.item(i); |
|
31 |
|
|
32 |
System.out.println("ELEM " + i + ": " + node.getLocalName()); |
|
33 |
} |
|
34 |
|
|
35 |
System.out.println("******************"); |
|
36 |
return "SUCA"; |
|
37 |
} |
|
38 |
|
|
39 |
|
|
40 |
private String getXml() { |
|
41 |
final StringWriter sw = new StringWriter(); |
|
42 |
sw.append("<?xml version='1.0' encoding='UTF-8'?>\n"); |
|
43 |
sw.append("<record>\n"); |
|
44 |
sw.append("<metadata>\n"); |
|
45 |
sw.append("<a>A text value</a>\n"); |
|
46 |
sw.append("<b attr='attribute value'/>\n"); |
|
47 |
sw.append("</metadata>\n"); |
|
48 |
sw.append("</record>\n"); |
|
49 |
|
|
50 |
return sw.toString(); |
|
51 |
} |
|
52 |
|
|
53 |
private String getXslt() { |
|
54 |
final StringWriter sw = new StringWriter(); |
|
55 |
|
|
56 |
sw.append("<?xml version='1.0' encoding='UTF-8'?>\n"); |
|
57 |
sw.append("<xsl:stylesheet version='1.0' xmlns:xsl='http://www.w3.org/1999/XSL/Transform' xmlns:dnet='eu.dnetlib.AppTest' exclude-result-prefixes='xsl dnet'>\n"); |
|
58 |
sw.append("<xsl:output omit-xml-declaration='yes' indent='yes'/>\n"); |
|
59 |
sw.append("<xsl:template match='/*'>\n"); |
|
60 |
sw.append("<xsl:variable name='metadata' select=\"//*[local-name()='metadata']/*\" />\n"); |
|
61 |
sw.append("<ROWS>\n"); |
|
62 |
sw.append("<xsl:value-of select='dnet:javaMethod($metadata)'/>\n"); |
|
63 |
sw.append("</ROWS>\n"); |
|
64 |
sw.append("</xsl:template>\n"); |
|
65 |
sw.append("</xsl:stylesheet>\n"); |
|
66 |
return sw.toString(); |
|
67 |
} |
|
68 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/cron/ScheduledWorkflowLauncher.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.cron; |
|
2 |
|
|
3 |
import java.util.Date; |
|
4 |
|
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import org.apache.commons.lang.StringUtils; |
|
8 |
import org.apache.commons.lang.math.NumberUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.quartz.CronExpression; |
|
12 |
import org.springframework.beans.factory.annotation.Required; |
|
13 |
|
|
14 |
import com.googlecode.sarasvati.GraphProcess; |
|
15 |
|
|
16 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
18 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
19 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
20 |
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor; |
|
21 |
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry; |
|
22 |
|
|
23 |
public class ScheduledWorkflowLauncher { |
|
24 |
|
|
25 |
private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class); |
|
26 |
|
|
27 |
private static final DateUtils dateUtils = new DateUtils(); |
|
28 |
|
|
29 |
private WorkflowExecutor workflowExecutor; |
|
30 |
|
|
31 |
private GraphProcessRegistry graphProcessRegistry; |
|
32 |
|
|
33 |
private int windowSize = 7200000; // 2 hours |
|
34 |
|
|
35 |
@Resource |
|
36 |
private UniqueServiceLocator serviceLocator; |
|
37 |
|
|
38 |
public void verifySheduledWorkflows() { |
|
39 |
log.debug("Verifying scheduled workflows - START"); |
|
40 |
|
|
41 |
final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') " + |
|
42 |
"where $x//CONFIGURATION/@status='EXECUTABLE' " + |
|
43 |
"and $x//SCHEDULING/@enabled='true' " + |
|
44 |
"return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCHEDULING/CRON, ' @@@ ', $x//SCHEDULING/MININTERVAL)"; |
|
45 |
|
|
46 |
try { |
|
47 |
for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) { |
|
48 |
final String[] arr = s.split("@@@"); |
|
49 |
final String id = arr[0].trim(); |
|
50 |
final String cron = arr[1].trim(); |
|
51 |
final int minInterval = NumberUtils.toInt(arr[2].trim(), 0) * 60000; // MINUTES to MILLIS |
|
52 |
|
|
53 |
if (CronExpression.isValidExpression(cron)) { |
|
54 |
final Date now = new Date(); |
|
55 |
final Date last = calculateLastExecutionDate(id); |
|
56 |
final int elapsed = Math.abs(Math.round(now.getTime() - last.getTime())); |
|
57 |
|
|
58 |
if (log.isDebugEnabled()) { |
|
59 |
log.debug("**************************************************************"); |
|
60 |
log.debug("META WORKFLOW ID : " + id); |
|
61 |
log.debug("NOW : " + now); |
|
62 |
log.debug("LAST EXECUTION DATE: " + last); |
|
63 |
log.debug("MIN INTERVAL : " + minInterval); |
|
64 |
log.debug("WINDOW SIZE : " + windowSize); |
|
65 |
log.debug("TIME ELAPSED : " + elapsed); |
|
66 |
} |
|
67 |
|
|
68 |
if (elapsed > minInterval && isFired(cron, last, now) && !isAlreadyRunning(id)) { |
|
69 |
log.debug("MUST BE EXECUTED : true"); |
|
70 |
try { |
|
71 |
workflowExecutor.startMetaWorkflow(id, false); |
|
72 |
} catch (Exception e) { |
|
73 |
log.error("Error launching scheduled wf: " + id, e); |
|
74 |
} |
|
75 |
} else { |
|
76 |
log.debug("MUST BE EXECUTED : false"); |
|
77 |
} |
|
78 |
log.debug("**************************************************************"); |
|
79 |
} |
|
80 |
} |
|
81 |
} catch (ISLookUpException e) { |
|
82 |
log.error("Error executing query " + query); |
|
83 |
} |
|
84 |
|
|
85 |
log.debug("Verifying scheduled workflows - END"); |
|
86 |
} |
|
87 |
|
|
88 |
private boolean isFired(final String cronExpression, final Date startDate, final Date now) { |
|
89 |
try { |
|
90 |
final CronExpression cron = new CronExpression(cronExpression); |
|
91 |
|
|
92 |
final Date prev = new Date(now.getTime() - windowSize); |
|
93 |
final Date date = prev.getTime() < startDate.getTime() ? startDate : prev; |
|
94 |
final Date next = cron.getNextValidTimeAfter(date); |
|
95 |
|
|
96 |
if (log.isDebugEnabled()) { |
|
97 |
log.debug("NEXT EXECUTION DATE: " + next); |
|
98 |
log.debug("FIRED : " + (next.getTime() < now.getTime())); |
|
99 |
} |
|
100 |
return next.getTime() < now.getTime(); |
|
101 |
} catch (Exception e) { |
|
102 |
log.error("Error calculating next cron event: " + cronExpression, e); |
|
103 |
return false; |
|
104 |
} |
|
105 |
} |
|
106 |
|
|
107 |
private boolean isAlreadyRunning(final String metaWfId) { |
|
108 |
final String query = "document('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(metaWfId, "_") |
|
109 |
+ "')//WORKFLOW/@id/string()"; |
|
110 |
|
|
111 |
try { |
|
112 |
for (String profileId : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) { |
|
113 |
if (profileId.length() > 0) { |
|
114 |
for (GraphProcess p : graphProcessRegistry.findProcessesByResource(profileId)) { |
|
115 |
switch (p.getState()) { |
|
116 |
case Created: |
|
117 |
return true; |
|
118 |
case Executing: |
|
119 |
return true; |
|
120 |
default: |
|
121 |
break; |
|
122 |
} |
|
123 |
} |
|
124 |
} |
|
125 |
} |
|
126 |
} catch (ISLookUpException e) { |
|
127 |
log.error("Error executing query " + query); |
|
128 |
} |
|
129 |
return false; |
|
130 |
} |
|
131 |
|
|
132 |
private Date calculateLastExecutionDate(final String id) { |
|
133 |
final String query = "for $id in document('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(id, "_") |
|
134 |
+ "')//WORKFLOW/@id/string() " + |
|
135 |
"for $x in document(concat('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType/', substring-before($id, '_'))) " + |
|
136 |
"where $x//LAST_EXECUTION_STATUS = 'SUCCESS' " + |
|
137 |
"return $x//LAST_EXECUTION_DATE/text() "; |
|
138 |
|
|
139 |
long time = 0; |
|
140 |
try { |
|
141 |
for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) { |
|
142 |
if (s.length() > 0) { |
|
143 |
final Date d = dateUtils.parse(s); |
|
144 |
if (time < d.getTime()) { |
|
145 |
time = d.getTime(); |
|
146 |
} |
|
147 |
} |
|
148 |
} |
|
149 |
} catch (ISLookUpException e) { |
|
150 |
log.error("Error executing query " + query); |
|
151 |
} catch (Exception e) { |
|
152 |
log.error("Error calculating date", e); |
|
153 |
} |
|
154 |
return new Date(time); |
|
155 |
} |
|
156 |
|
|
157 |
public WorkflowExecutor getWorkflowExecutor() { |
|
158 |
return workflowExecutor; |
|
159 |
} |
|
160 |
|
|
161 |
@Required |
|
162 |
public void setWorkflowExecutor(final WorkflowExecutor workflowExecutor) { |
|
163 |
this.workflowExecutor = workflowExecutor; |
|
164 |
} |
|
165 |
|
|
166 |
public GraphProcessRegistry getGraphProcessRegistry() { |
|
167 |
return graphProcessRegistry; |
|
168 |
} |
|
169 |
|
|
170 |
@Required |
|
171 |
public void setGraphProcessRegistry(final GraphProcessRegistry graphProcessRegistry) { |
|
172 |
this.graphProcessRegistry = graphProcessRegistry; |
|
173 |
} |
|
174 |
|
|
175 |
public int getWindowSize() { |
|
176 |
return windowSize; |
|
177 |
} |
|
178 |
|
|
179 |
@Required |
|
180 |
public void setWindowSize(final int windowSize) { |
|
181 |
this.windowSize = windowSize; |
|
182 |
} |
|
183 |
|
|
184 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/MSROServiceImpl.java | ||
---|---|---|
1 |
package eu.dnetlib.msro; |
|
2 |
|
|
3 |
import javax.jws.WebService; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.springframework.beans.factory.annotation.Required; |
|
8 |
|
|
9 |
import eu.dnetlib.enabling.tools.AbstractBaseService; |
|
10 |
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler; |
|
11 |
import eu.dnetlib.msro.rmi.MSROService; |
|
12 |
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry; |
|
13 |
|
|
14 |
@WebService(targetNamespace = "http://services.dnetlib.eu/") |
|
15 |
public class MSROServiceImpl extends AbstractBaseService implements MSROService { |
|
16 |
|
|
17 |
/** |
|
18 |
* logger. |
|
19 |
*/ |
|
20 |
private static final Log log = LogFactory.getLog(MSROServiceImpl.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
21 |
|
|
22 |
/** |
|
23 |
* notification handler. |
|
24 |
*/ |
|
25 |
private NotificationHandler notificationHandler; |
|
26 |
|
|
27 |
/** |
|
28 |
* graph process registry. |
|
29 |
*/ |
|
30 |
private GraphProcessRegistry processRegistry; |
|
31 |
|
|
32 |
@Override |
|
33 |
public void notify(final String subscriptionId, final String topic, final String isId, final String message) { |
|
34 |
super.notify(subscriptionId, topic, isId, message); |
|
35 |
|
|
36 |
log.debug("got notification: " + topic); |
|
37 |
|
|
38 |
getNotificationHandler().notified(subscriptionId, topic, isId, message); |
|
39 |
} |
|
40 |
|
|
41 |
@Required |
|
42 |
public void setNotificationHandler(final NotificationHandler notHandler) { |
|
43 |
this.notificationHandler = notHandler; |
|
44 |
} |
|
45 |
|
|
46 |
public NotificationHandler getNotificationHandler() { |
|
47 |
return notificationHandler; |
|
48 |
} |
|
49 |
|
|
50 |
@Required |
|
51 |
public void setProcessRegistry(final GraphProcessRegistry processRegistry) { |
|
52 |
this.processRegistry = processRegistry; |
|
53 |
} |
|
54 |
|
|
55 |
public GraphProcessRegistry getProcessRegistry() { |
|
56 |
return processRegistry; |
|
57 |
} |
|
58 |
|
|
59 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/notification/WfDependencyLauncherNotificationHandler.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.notification; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.Map; |
|
5 |
import java.util.Set; |
|
6 |
|
|
7 |
import javax.annotation.Resource; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.dom4j.Document; |
|
12 |
import org.dom4j.DocumentException; |
|
13 |
import org.dom4j.Node; |
|
14 |
import org.dom4j.io.SAXReader; |
|
15 |
|
|
16 |
import com.google.common.base.Splitter; |
|
17 |
import com.google.common.collect.Maps; |
|
18 |
import com.google.common.collect.Sets; |
|
19 |
|
|
20 |
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction; |
|
21 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
22 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
23 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
24 |
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor; |
|
25 |
|
|
26 |
public class WfDependencyLauncherNotificationHandler extends AbstractSubscriptionAction { |
|
27 |
|
|
28 |
@Resource |
|
29 |
private UniqueServiceLocator serviceLocator; |
|
30 |
|
|
31 |
@Resource |
|
32 |
private WorkflowExecutor workflowExecutor; |
|
33 |
|
|
34 |
@Resource |
|
35 |
private EmailDispatcher emailDispatcher; |
|
36 |
|
|
37 |
private static final Log log = LogFactory.getLog(WorkflowExecutor.class); |
|
38 |
|
|
39 |
@Override |
|
40 |
public void notified(final String subscrId, final String topic, final String rsId, final String profile) { |
|
41 |
|
|
42 |
final SAXReader reader = new SAXReader(); |
|
43 |
try { |
|
44 |
final Document doc = reader.read(new StringReader(profile)); |
|
45 |
|
|
46 |
final Set<String> emails = calculateEmails(rsId); |
|
47 |
final String procId = doc.valueOf("//LAST_EXECUTION_ID"); |
|
48 |
final String wfName = doc.valueOf("//WORKFLOW_NAME"); |
|
49 |
final boolean success = doc.valueOf("//LAST_EXECUTION_STATUS").equals("SUCCESS"); |
|
50 |
final Map<String, String> responses = Maps.newHashMap(); |
|
51 |
|
|
52 |
for (Object o : doc.selectNodes("//LAST_EXECUTION_OUTPUT")) { |
|
53 |
Node n = (Node) o; |
|
54 |
responses.put(n.valueOf("@name"), n.getText()); |
|
55 |
} |
|
56 |
|
|
57 |
if (!success) { |
|
58 |
log.info("Last execution of " + rsId + " failed, dependencies NOT STARTED"); |
|
59 |
} |
|
60 |
|
|
61 |
final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')//WORKFLOW[@id='" + rsId |
|
62 |
+ "']/WORKFLOW let $y := /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = $x/@id] " |
|
63 |
+ "where $y//CONFIGURATION/@start != 'disabled' return concat ($x/@id , ' @@@ ', $x/@name , ' @@@ ', $y//CONFIGURATION/@start)"; |
|
64 |
|
|
65 |
try { |
|
66 |
final Map<String, String> pendingWfs = Maps.newHashMap(); |
|
67 |
|
|
68 |
for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) { |
|
69 |
final String[] arr = s.split("@@@"); |
|
70 |
final String id = arr[0].trim(); |
|
71 |
final String name = arr[1].trim(); |
|
72 |
final boolean manual = arr[2].trim().toLowerCase().equals("manual"); |
|
73 |
if (success && !manual) { |
|
74 |
try { |
|
75 |
String pid = workflowExecutor.startProcess(id); |
|
76 |
log.info("PROC " + pid + " of WF " + id + " STARTED AS CHILD OF " + rsId); |
|
77 |
} catch (Exception e) { |
|
78 |
log.error("Error starting wf: " + id); |
|
79 |
} |
|
80 |
} else { |
|
81 |
pendingWfs.put(id, name); |
|
82 |
} |
|
83 |
} |
|
84 |
|
|
85 |
if (!emails.isEmpty()) { |
|
86 |
if (success) { |
|
87 |
emailDispatcher.sendSuccessMail(emails, rsId, procId, wfName, pendingWfs, responses); |
|
88 |
} else { |
|
89 |
final String error = doc.valueOf("//LAST_EXECUTION_ERROR"); |
|
90 |
emailDispatcher.sendFailedMail(emails, rsId, procId, wfName, pendingWfs, responses, error); |
|
91 |
} |
|
92 |
} |
|
93 |
} catch (ISLookUpException e) { |
|
94 |
log.error("Error executing xquery: " + query, e); |
|
95 |
} |
|
96 |
} catch (DocumentException e) { |
|
97 |
log.error("Error parsing profile with id " + rsId + ": " + profile); |
|
98 |
} |
|
99 |
} |
|
100 |
|
|
101 |
private Set<String> calculateEmails(final String id) { |
|
102 |
final Set<String> list = Sets.newHashSet(); |
|
103 |
try { |
|
104 |
for (String val : serviceLocator.getService(ISLookUpService.class).quickSearchProfile("//ADMIN_EMAIL[..//WORKFLOW/@id='" + id + "']/text()")) { |
|
105 |
for (String s : Splitter.on(",").trimResults().omitEmptyStrings().split(val)) { |
|
106 |
list.add(s); |
|
107 |
} |
|
108 |
} |
|
109 |
} catch (Exception e) { |
|
110 |
log.error("Error searching email adresses", e); |
|
111 |
} |
|
112 |
return list; |
|
113 |
} |
|
114 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/notification/EmailDispatcher.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.notification; |
|
2 |
|
|
3 |
import java.util.Arrays; |
|
4 |
import java.util.Date; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Properties; |
|
7 |
import java.util.Set; |
|
8 |
import java.util.concurrent.BlockingQueue; |
|
9 |
import java.util.concurrent.LinkedBlockingQueue; |
|
10 |
|
|
11 |
import javax.mail.Authenticator; |
|
12 |
import javax.mail.Message; |
|
13 |
import javax.mail.MessagingException; |
|
14 |
import javax.mail.PasswordAuthentication; |
|
15 |
import javax.mail.Session; |
|
16 |
import javax.mail.Transport; |
|
17 |
import javax.mail.internet.InternetAddress; |
|
18 |
import javax.mail.internet.MimeMessage; |
|
19 |
|
|
20 |
import org.antlr.stringtemplate.StringTemplate; |
|
21 |
import org.apache.commons.io.IOUtils; |
|
22 |
import org.apache.commons.logging.Log; |
|
23 |
import org.apache.commons.logging.LogFactory; |
|
24 |
import org.springframework.beans.factory.annotation.Required; |
|
25 |
|
|
26 |
import com.google.common.base.Splitter; |
|
27 |
import com.google.common.collect.Maps; |
|
28 |
|
|
29 |
public class EmailDispatcher { |
|
30 |
|
|
31 |
private String from; |
|
32 |
private String fromName; |
|
33 |
private String cc; |
|
34 |
private String smtpHost; |
|
35 |
private int smtpPort = 587; |
|
36 |
private String smtpUser; |
|
37 |
private String smtpPassword; |
|
38 |
private String baseUrl; |
|
39 |
|
|
40 |
private static final Log log = LogFactory.getLog(EmailDispatcher.class); |
|
41 |
|
|
42 |
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>(); |
|
43 |
|
|
44 |
public void sendMail(final Set<String> to, final String subject, final String template, final Map<String, Object> tmplParams) { |
|
45 |
try { |
|
46 |
final StringTemplate st = new StringTemplate(template); |
|
47 |
st.setAttributes(tmplParams); |
|
48 |
st.setAttribute("baseUrl", baseUrl); |
|
49 |
|
|
50 |
final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator()); |
|
51 |
|
|
52 |
final MimeMessage message = new MimeMessage(session); |
|
53 |
message.setFrom(new InternetAddress(from, fromName)); |
|
54 |
message.setSubject(subject); |
|
55 |
message.setContent(st.toString(), "text/html; charset=utf-8"); |
|
56 |
message.setSentDate(new Date()); |
|
57 |
|
|
58 |
for (String s : to) { |
|
59 |
message.addRecipient(Message.RecipientType.TO, new InternetAddress(s)); |
|
60 |
} |
|
61 |
if ((cc != null) && !cc.isEmpty()) { |
|
62 |
for (String aCC : Splitter.on(",").omitEmptyStrings().trimResults().split(getCc())) { |
|
63 |
message.addRecipient(Message.RecipientType.CC, new InternetAddress(aCC)); |
|
64 |
} |
|
65 |
} |
|
66 |
|
|
67 |
queue.add(message); |
|
68 |
|
|
69 |
log.info("Mail to " + Arrays.toString(to.toArray()) + " in queue"); |
|
70 |
} catch (Exception e) { |
|
71 |
log.error("Error sending mail", e); |
|
72 |
} |
|
73 |
} |
|
74 |
|
|
75 |
public void processMailQueue() { |
|
76 |
while (true) { |
|
77 |
final Message message = queue.poll(); |
|
78 |
if (message == null) { |
|
79 |
return; |
|
80 |
} else { |
|
81 |
try { |
|
82 |
log.info("Sending mail..."); |
|
83 |
Transport.send(message); |
|
84 |
log.info("...sent"); |
|
85 |
} catch (MessagingException e) { |
|
86 |
log.error("Error sending email", e); |
|
87 |
queue.add(message); |
|
88 |
return; |
|
89 |
} |
|
90 |
} |
|
91 |
} |
|
92 |
} |
|
93 |
|
|
94 |
private void sendWfStatusMail(final boolean success, |
|
95 |
final Set<String> to, |
|
96 |
final String wfId, |
|
97 |
final String procId, |
|
98 |
final String wfName, |
|
99 |
final Map<String, String> pendingWfs, |
|
100 |
final Map<String, String> responses, |
|
101 |
final String error) { |
|
102 |
try { |
|
103 |
final Map<String, Object> map = Maps.newHashMap(); |
|
104 |
map.put("wfId", wfId); |
|
105 |
map.put("wfName", wfName); |
|
106 |
map.put("procId", procId); |
|
107 |
if ((pendingWfs != null) && !pendingWfs.isEmpty()) { |
|
108 |
map.put("pendingWfs", pendingWfs); |
|
109 |
} |
|
110 |
if ((responses != null) && !responses.isEmpty()) { |
|
111 |
map.put("responses", responses); |
|
112 |
} |
|
113 |
if ((error != null) && !error.isEmpty()) { |
|
114 |
map.put("error", error); |
|
115 |
} |
|
116 |
|
|
117 |
final String subject = success ? "Workflow '" + wfName + "' has been completed successfully" : "Workflow '" + wfName + "' is failed"; |
|
118 |
final String tmplName = success ? "wf_success.mail.st" : "wf_failed.mail.st"; |
|
119 |
final String template = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/msro/mail/" + tmplName)); |
|
120 |
|
|
121 |
sendMail(to, subject, template, map); |
|
122 |
} catch (Exception e) { |
|
123 |
log.error("Error generating success-mail", e); |
|
124 |
} |
|
125 |
} |
|
126 |
|
|
127 |
public void sendSuccessMail(final Set<String> to, |
|
128 |
final String wfId, |
|
129 |
final String procId, |
|
130 |
final String wfName, |
|
131 |
final Map<String, String> pendingWfs, |
|
132 |
final Map<String, String> responses) { |
|
133 |
sendWfStatusMail(true, to, wfId, procId, wfName, pendingWfs, responses, ""); |
|
134 |
} |
|
135 |
|
|
136 |
public void sendFailedMail(final Set<String> to, |
|
137 |
final String wfId, |
|
138 |
final String procId, |
|
139 |
final String wfName, |
|
140 |
final Map<String, String> pendingWfs, |
|
141 |
final Map<String, String> responses, |
|
142 |
final String error) { |
|
143 |
sendWfStatusMail(false, to, wfId, procId, wfName, pendingWfs, responses, error); |
|
144 |
} |
|
145 |
|
|
146 |
private Properties obtainProperties() { |
|
147 |
final Properties props = new Properties(); |
|
148 |
props.put("mail.transport.protocol", "smtp"); |
|
149 |
props.put("mail.smtp.host", smtpHost); |
|
150 |
props.put("mail.smtp.port", smtpPort); |
|
151 |
props.put("mail.smtp.auth", Boolean.toString((smtpUser != null) && !smtpUser.isEmpty())); |
|
152 |
return props; |
|
153 |
} |
|
154 |
|
|
155 |
private Authenticator obtainAuthenticator() { |
|
156 |
if ((smtpUser == null) || smtpUser.isEmpty()) { return null; } |
|
157 |
|
|
158 |
return new Authenticator() { |
|
159 |
|
|
160 |
private final PasswordAuthentication authentication = new PasswordAuthentication(smtpUser, smtpPassword); |
|
161 |
|
|
162 |
@Override |
|
163 |
protected PasswordAuthentication getPasswordAuthentication() { |
|
164 |
return authentication; |
|
165 |
} |
|
166 |
|
|
167 |
}; |
|
168 |
} |
|
169 |
|
|
170 |
public String getFrom() { |
|
171 |
return from; |
|
172 |
} |
|
173 |
|
|
174 |
@Required |
|
175 |
public void setFrom(final String from) { |
|
176 |
this.from = from; |
|
177 |
} |
|
178 |
|
|
179 |
public String getFromName() { |
|
180 |
return fromName; |
|
181 |
} |
|
182 |
|
|
183 |
@Required |
|
184 |
public void setFromName(final String fromName) { |
|
185 |
this.fromName = fromName; |
|
186 |
} |
|
187 |
|
|
188 |
public String getCc() { |
|
189 |
return cc; |
|
190 |
} |
|
191 |
|
|
192 |
@Required |
|
193 |
public void setCc(final String cc) { |
|
194 |
this.cc = cc; |
|
195 |
} |
|
196 |
|
|
197 |
public String getSmtpHost() { |
|
198 |
return smtpHost; |
|
199 |
} |
|
200 |
|
|
201 |
@Required |
|
202 |
public void setSmtpHost(final String smtpHost) { |
|
203 |
this.smtpHost = smtpHost; |
|
204 |
} |
|
205 |
|
|
206 |
public int getSmtpPort() { |
|
207 |
return smtpPort; |
|
208 |
} |
|
209 |
|
|
210 |
public void setSmtpPort(final int smtpPort) { |
|
211 |
this.smtpPort = smtpPort; |
|
212 |
} |
|
213 |
|
|
214 |
public String getSmtpUser() { |
|
215 |
return smtpUser; |
|
216 |
} |
|
217 |
|
|
218 |
public void setSmtpUser(final String smtpUser) { |
|
219 |
this.smtpUser = smtpUser; |
|
220 |
} |
|
221 |
|
|
222 |
public String getSmtpPassword() { |
|
223 |
return smtpPassword; |
|
224 |
} |
|
225 |
|
|
226 |
public void setSmtpPassword(final String smtpPassword) { |
|
227 |
this.smtpPassword = smtpPassword; |
|
228 |
} |
|
229 |
|
|
230 |
public String getBaseUrl() { |
|
231 |
return baseUrl; |
|
232 |
} |
|
233 |
|
|
234 |
@Required |
|
235 |
public void setBaseUrl(final String baseUrl) { |
|
236 |
this.baseUrl = baseUrl; |
|
237 |
} |
|
238 |
|
|
239 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/DatasourceMetaWorkflowFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.metawf; |
|
2 |
|
|
3 |
import org.springframework.context.ApplicationContext; |
|
4 |
import org.springframework.context.ApplicationContextAware; |
|
5 |
|
|
6 |
public class DatasourceMetaWorkflowFactory implements ApplicationContextAware { |
|
7 |
|
|
8 |
private transient ApplicationContext applicationContext; |
|
9 |
|
|
10 |
public DatasourceMetaWorkflow newMetaWorkflow(final String beanName) { |
|
11 |
final DatasourceMetaWorkflow prototypeMetaWf = (DatasourceMetaWorkflow) applicationContext.getBean(beanName, DatasourceMetaWorkflow.class); |
|
12 |
|
|
13 |
if (prototypeMetaWf != null) { |
|
14 |
return prototypeMetaWf; |
|
15 |
} else { |
|
16 |
throw new IllegalArgumentException("cannot find bean " + beanName); |
|
17 |
} |
|
18 |
} |
|
19 |
|
|
20 |
@Override |
|
21 |
public void setApplicationContext(final ApplicationContext context) { |
|
22 |
this.applicationContext = context; |
|
23 |
} |
|
24 |
|
|
25 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/WorkflowTree.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.metawf; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.io.StringWriter; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
|
|
8 |
import javax.annotation.Resource; |
|
9 |
|
|
10 |
import org.apache.commons.lang.StringEscapeUtils; |
|
11 |
import org.springframework.beans.factory.annotation.Required; |
|
12 |
|
|
13 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; |
|
14 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
15 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
16 |
|
|
17 |
public class WorkflowTree { |
|
18 |
|
|
19 |
private String id; |
|
20 |
|
|
21 |
private String name; |
|
22 |
|
|
23 |
private String start = "auto"; |
|
24 |
|
|
25 |
private List<WorkflowTree> children; |
|
26 |
|
|
27 |
private org.springframework.core.io.Resource template; |
|
28 |
|
|
29 |
@Resource |
|
30 |
private UniqueServiceLocator serviceLocator; |
|
31 |
|
|
32 |
public void populateMetaWfXml(final StringWriter sw) { |
|
33 |
sw.append("<WORKFLOW id='" + StringEscapeUtils.escapeXml(id) + "' name='" + StringEscapeUtils.escapeXml(name) + "'"); |
|
34 |
|
|
35 |
if (children == null || children.isEmpty()) { |
|
36 |
sw.append(" />"); |
|
37 |
} else { |
|
38 |
sw.append(">"); |
|
39 |
for (WorkflowTree child : children) { |
|
40 |
child.populateMetaWfXml(sw); |
|
41 |
} |
|
42 |
sw.append("</WORKFLOW>"); |
|
43 |
} |
|
44 |
} |
|
45 |
|
|
46 |
public int registerAllWorkflows(final Map<String, String> params) throws ISRegistryException, IOException { |
|
47 |
int count = 0; |
|
48 |
|
|
49 |
if (this.id == null || this.id.isEmpty()) { |
|
50 |
final String profile = WorkflowProfileCreator.generateProfile(name, "aggregator", params, template); |
|
51 |
this.id = serviceLocator.getService(ISRegistryService.class).registerProfile(profile); |
|
52 |
count++; |
|
53 |
} |
|
54 |
|
|
55 |
if (children != null) { |
|
56 |
for (WorkflowTree child : children) { |
|
57 |
count += child.registerAllWorkflows(params); |
|
58 |
} |
|
59 |
} |
|
60 |
return count; |
|
61 |
} |
|
62 |
|
|
63 |
public String getId() { |
|
64 |
return id; |
|
65 |
} |
|
66 |
|
|
67 |
public List<WorkflowTree> getChildren() { |
|
68 |
return children; |
|
69 |
} |
|
70 |
|
|
71 |
public void setChildren(final List<WorkflowTree> children) { |
|
72 |
this.children = children; |
|
73 |
} |
|
74 |
|
|
75 |
public String getName() { |
|
76 |
return name; |
|
77 |
} |
|
78 |
|
|
79 |
@Required |
|
80 |
public void setName(final String name) { |
|
81 |
this.name = name; |
|
82 |
} |
|
83 |
|
|
84 |
public org.springframework.core.io.Resource getTemplate() { |
|
85 |
return template; |
|
86 |
} |
|
87 |
|
|
88 |
@Required |
|
89 |
public void setTemplate(final org.springframework.core.io.Resource template) { |
|
90 |
this.template = template; |
|
91 |
} |
|
92 |
|
|
93 |
public String getStart() { |
|
94 |
return start; |
|
95 |
} |
|
96 |
|
|
97 |
public void setStart(final String start) { |
|
98 |
this.start = start; |
|
99 |
} |
|
100 |
|
|
101 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/WorkflowProfileCreator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.metawf; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.antlr.stringtemplate.StringTemplate; |
|
7 |
import org.apache.commons.io.IOUtils; |
|
8 |
import org.springframework.core.io.ClassPathResource; |
|
9 |
import org.springframework.core.io.Resource; |
|
10 |
|
|
11 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
12 |
|
|
13 |
public class WorkflowProfileCreator { |
|
14 |
|
|
15 |
private static final Resource wfTemplate = new ClassPathResource("/eu/dnetlib/msro/workflows/templates/workflow.xml.st"); |
|
16 |
|
|
17 |
public static String generateProfile(final String name, final String type, final Map<String, String> params, final Resource confTemplate) |
|
18 |
throws IOException { |
|
19 |
|
|
20 |
final StringTemplate conf = new StringTemplate(IOUtils.toString(confTemplate.getInputStream())); |
|
21 |
conf.setAttribute("params", params); |
|
22 |
|
|
23 |
final StringTemplate profile = new StringTemplate(IOUtils.toString(wfTemplate.getInputStream())); |
|
24 |
profile.setAttribute("name", name); |
|
25 |
profile.setAttribute("type", type); |
|
26 |
profile.setAttribute("priority", WorkflowsConstants.DEFAULT_WF_PRIORITY); |
|
27 |
profile.setAttribute("conf", conf.toString()); |
|
28 |
|
|
29 |
return profile.toString(); |
|
30 |
} |
|
31 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/DatasourceMetaWorkflow.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.metawf; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.io.StringWriter; |
|
5 |
import java.util.Map; |
|
6 |
|
|
7 |
import org.springframework.beans.factory.annotation.Required; |
|
8 |
import org.springframework.core.io.Resource; |
|
9 |
|
|
10 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; |
|
11 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
12 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
13 |
|
|
14 |
public class DatasourceMetaWorkflow { |
|
15 |
|
|
16 |
private WorkflowTree tree; |
|
17 |
|
|
18 |
private Resource destroyWorkflowTemplate; |
|
19 |
|
|
20 |
@javax.annotation.Resource |
|
21 |
private UniqueServiceLocator serviceLocator; |
|
22 |
|
|
23 |
public int registerAllWorkflows(final Map<String, String> params) throws ISRegistryException, IOException { |
|
24 |
return tree.registerAllWorkflows(params); |
|
25 |
} |
|
26 |
|
|
27 |
public String registerDestroyWorkflow(final Map<String, String> params) throws ISRegistryException, IOException { |
|
28 |
final String profile = WorkflowProfileCreator.generateProfile("Repo BYE", "REPO_BYE", params, destroyWorkflowTemplate); |
|
29 |
return serviceLocator.getService(ISRegistryService.class).registerProfile(profile); |
|
30 |
} |
|
31 |
|
|
32 |
public String asXML() { |
|
33 |
final StringWriter sw = new StringWriter(); |
|
34 |
tree.populateMetaWfXml(sw); |
|
35 |
return sw.toString(); |
|
36 |
} |
|
37 |
|
|
38 |
public WorkflowTree getTree() { |
|
39 |
return tree; |
|
40 |
} |
|
41 |
|
|
42 |
@Required |
|
43 |
public void setTree(final WorkflowTree tree) { |
|
44 |
this.tree = tree; |
|
45 |
} |
|
46 |
|
|
47 |
public Resource getDestroyWorkflowTemplate() { |
|
48 |
return destroyWorkflowTemplate; |
|
49 |
} |
|
50 |
|
|
51 |
@Required |
|
52 |
public void setDestroyWorkflowTemplate(final Resource destroyWorkflowTemplate) { |
|
53 |
this.destroyWorkflowTemplate = destroyWorkflowTemplate; |
|
54 |
} |
|
55 |
|
|
56 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/unpack/UnpackJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.nodes.unpack; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.Iterator; |
|
5 |
import java.util.Queue; |
|
6 |
import java.util.concurrent.PriorityBlockingQueue; |
|
7 |
|
|
8 |
import javax.xml.ws.wsaddressing.W3CEndpointReference; |
|
9 |
|
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.dom4j.Document; |
|
13 |
import org.dom4j.Node; |
|
14 |
import org.dom4j.io.SAXReader; |
|
15 |
import org.springframework.beans.factory.annotation.Required; |
|
16 |
|
|
17 |
import com.googlecode.sarasvati.Arc; |
|
18 |
import com.googlecode.sarasvati.NodeToken; |
|
19 |
|
|
20 |
import eu.dnetlib.enabling.resultset.IterableResultSetFactory; |
|
21 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
22 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
23 |
|
|
24 |
public class UnpackJobNode extends SimpleJobNode { |
|
25 |
|
|
26 |
/** |
|
27 |
* logger. |
|
28 |
*/ |
|
29 |
private static final Log log = LogFactory.getLog(UnpackJobNode.class); |
|
30 |
|
|
31 |
private String inputEprParam; |
|
32 |
private String outputEprParam; |
|
33 |
private String xpath; |
|
34 |
|
|
35 |
private IterableResultSetFactory iterableResultSetFactory; |
|
36 |
private ResultSetClientFactory resultSetClientFactory; |
|
37 |
|
|
38 |
@Override |
|
39 |
protected String execute(final NodeToken token) throws Exception { |
|
40 |
final Iterator<String> client = resultSetClientFactory.getClient(token.getEnv().getAttribute(inputEprParam)).iterator(); |
|
41 |
final Queue<String> queue = new PriorityBlockingQueue<String>(); |
|
42 |
|
|
43 |
if (client.hasNext()) { |
|
44 |
populateQueue(queue, client.next(), xpath); |
|
45 |
} |
|
46 |
|
|
47 |
final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() { |
|
48 |
|
|
49 |
@Override |
|
50 |
public Iterator<String> iterator() { |
|
51 |
return new Iterator<String>() { |
|
52 |
|
|
53 |
@Override |
|
54 |
public boolean hasNext() { |
|
55 |
synchronized (queue) { |
|
56 |
return !queue.isEmpty(); |
|
57 |
} |
|
58 |
} |
|
59 |
|
|
60 |
@Override |
|
61 |
public String next() { |
|
62 |
synchronized (queue) { |
|
63 |
final String res = queue.poll(); |
|
64 |
while (queue.isEmpty() && client.hasNext()) { |
|
65 |
populateQueue(queue, client.next(), xpath); |
|
66 |
} |
|
67 |
return res; |
|
68 |
} |
|
69 |
} |
|
70 |
|
|
71 |
@Override |
|
72 |
public void remove() {} |
|
73 |
}; |
|
74 |
} |
|
75 |
}); |
|
76 |
|
|
77 |
token.getEnv().setAttribute(outputEprParam, epr.toString()); |
|
78 |
|
|
79 |
return Arc.DEFAULT_ARC; |
|
80 |
} |
|
81 |
|
|
82 |
private void populateQueue(final Queue<String> queue, final String record, final String xpath) { |
|
83 |
try { |
|
84 |
final SAXReader reader = new SAXReader(); |
|
85 |
final Document doc = reader.read(new StringReader(record)); |
|
86 |
for (Object o : doc.selectNodes(xpath)) { |
|
87 |
queue.add(((Node) o).asXML()); |
|
88 |
} |
|
89 |
} catch (Exception e) { |
|
90 |
log.error("Error unpacking record: \n" + record, e); |
|
91 |
throw new RuntimeException(e); |
|
92 |
} |
|
93 |
} |
|
94 |
|
|
95 |
public IterableResultSetFactory getIterableResultSetFactory() { |
|
96 |
return iterableResultSetFactory; |
|
97 |
} |
|
98 |
|
|
99 |
@Required |
|
100 |
public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) { |
|
101 |
this.iterableResultSetFactory = iterableResultSetFactory; |
|
102 |
} |
|
103 |
|
|
104 |
public ResultSetClientFactory getResultSetClientFactory() { |
|
105 |
return resultSetClientFactory; |
|
106 |
} |
|
107 |
|
|
108 |
@Required |
|
109 |
public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) { |
|
110 |
this.resultSetClientFactory = resultSetClientFactory; |
|
111 |
} |
|
112 |
|
|
113 |
public String getInputEprParam() { |
|
114 |
return inputEprParam; |
|
115 |
} |
|
116 |
|
|
117 |
public void setInputEprParam(final String inputEprParam) { |
|
118 |
this.inputEprParam = inputEprParam; |
|
119 |
} |
|
120 |
|
|
121 |
public String getOutputEprParam() { |
|
122 |
return outputEprParam; |
|
123 |
} |
|
124 |
|
|
125 |
public void setOutputEprParam(final String outputEprParam) { |
|
126 |
this.outputEprParam = outputEprParam; |
|
127 |
} |
|
128 |
|
|
129 |
public String getXpath() { |
|
130 |
return xpath; |
|
131 |
} |
|
132 |
|
|
133 |
public void setXpath(final String xpath) { |
|
134 |
this.xpath = xpath; |
|
135 |
} |
|
136 |
|
|
137 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/objectStore/RetrieveMdStoreId.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.nodes.objectStore; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Set; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.springframework.beans.factory.annotation.Required; |
|
9 |
|
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import com.google.gson.Gson; |
|
13 |
import com.googlecode.sarasvati.Arc; |
|
14 |
import com.googlecode.sarasvati.NodeToken; |
|
15 |
|
|
16 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
18 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
19 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
20 |
|
|
21 |
/** |
|
22 |
* The Class RetrieveMdStoreId is a job node used to retrieve the correct MDStore from which extract the url of the file to download. |
|
23 |
* metadata format and interpretation are injected as properties |
|
24 |
*/ |
|
25 |
public class RetrieveMdStoreId extends SimpleJobNode { |
|
26 |
|
|
27 |
/** The metadata format. */ |
|
28 |
private String metadataFormat; |
|
29 |
|
|
30 |
/** The interpretation. */ |
|
31 |
private String interpretation; |
|
32 |
|
|
33 |
/** The provider id. */ |
|
34 |
private String providerId; |
|
35 |
|
|
36 |
/** The service locator. */ |
|
37 |
@Resource |
|
38 |
private UniqueServiceLocator serviceLocator; |
|
39 |
|
|
40 |
/* |
|
41 |
* (non-Javadoc) |
|
42 |
* |
|
43 |
* @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
44 |
*/ |
|
45 |
@Override |
|
46 |
protected String execute(final NodeToken token) throws Exception { |
|
47 |
|
|
48 |
String workflowQuery = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') where($x//DATAPROVIDER/@id='%s') return distinct-values($x//WORKFLOW/@id/string())"; |
|
49 |
|
|
50 |
List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(workflowQuery, providerId)); |
|
51 |
if (result.size() == 0) { throw new RuntimeException("there is no mdStore Associated to the provider " + token.getEnv().getAttribute(getProviderId())); } |
|
52 |
Set<String> workflowIds = Sets.newHashSet(result); |
|
53 |
|
|
54 |
Set<String> metadataIds = getMdStores(workflowIds); |
|
55 |
Gson g = new Gson(); |
|
56 |
token.getEnv().setAttribute("mdId", g.toJson(metadataIds)); |
|
57 |
|
|
58 |
token.getEnv().setAttribute("mdFormat", getMetadataFormat()); |
|
59 |
return Arc.DEFAULT_ARC; |
|
60 |
} |
|
61 |
|
|
62 |
private Set<String> getMdStores(final Set<String> workflowsId) { |
|
63 |
try { |
|
64 |
|
|
65 |
String query = "//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']//PARAM[./@category/string()='MDSTORE_ID']/text()"; |
|
66 |
|
|
67 |
Set<String> mdStores = Sets.newHashSet(); |
|
68 |
|
|
69 |
if (workflowsId == null) { return null; } |
|
70 |
|
|
71 |
for (String workflowId : workflowsId) { |
|
72 |
List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, workflowId)); |
|
73 |
Set<String> metadataIds = Sets.newHashSet(result); |
|
74 |
mdStores.addAll(getRightMetadataId(Lists.newArrayList(metadataIds))); |
|
75 |
} |
|
76 |
return mdStores; |
|
77 |
|
|
78 |
} catch (ISLookUpException e) { |
|
79 |
|
|
80 |
return null; |
|
81 |
} |
|
82 |
} |
|
83 |
|
|
84 |
/** |
|
85 |
* Gets the right metadata id whith the format metadataFormat and interpretation interpretation |
|
86 |
* |
|
87 |
* @return the right metadata id |
|
88 |
* @throws ISLookUpException |
|
89 |
*/ |
|
90 |
private Set<String> getRightMetadataId(final Iterable<String> ids) throws ISLookUpException { |
|
91 |
String query = "concat(//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value=\"%s\"]//METADATA_FORMAT/text(), \"::<<>>::\"" |
|
92 |
+ ",//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value=\"%s\"]//METADATA_FORMAT_INTERPRETATION/text())"; |
|
93 |
Set<String> result = Sets.newHashSet(); |
|
94 |
|
|
95 |
for (String id : ids) { |
|
96 |
|
|
97 |
List<String> results = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, id, id)); |
|
98 |
if (results.size() > 0) { |
|
99 |
String[] values = results.get(0).split("::<<>>::"); |
|
100 |
if (metadataFormat.equals(values[0]) && interpretation.equals(values[1])) { |
|
101 |
result.add(id); |
|
102 |
} |
|
103 |
} |
|
104 |
} |
|
105 |
return result; |
|
106 |
|
|
107 |
} |
|
108 |
|
|
109 |
/** |
|
110 |
* Gets the interpretation. |
|
111 |
* |
|
112 |
* @return the interpretation |
|
113 |
*/ |
|
114 |
public String getInterpretation() { |
|
115 |
return interpretation; |
|
116 |
} |
|
117 |
|
|
118 |
/** |
|
119 |
* Sets the interpretation. |
|
120 |
* |
|
121 |
* @param interpretation |
|
122 |
* the interpretation to set |
|
123 |
*/ |
|
124 |
@Required |
|
125 |
public void setInterpretation(final String interpretation) { |
|
126 |
this.interpretation = interpretation; |
|
127 |
} |
|
128 |
|
|
129 |
/** |
|
130 |
* Gets the metadata format. |
|
131 |
* |
|
132 |
* @return the metadataFormat |
|
133 |
*/ |
|
134 |
public String getMetadataFormat() { |
|
135 |
return metadataFormat; |
|
136 |
} |
|
137 |
|
|
138 |
/** |
|
139 |
* Sets the metadata format. |
|
140 |
* |
|
141 |
* @param metadataFormat |
|
142 |
* the metadataFormat to set |
|
143 |
*/ |
|
144 |
@Required |
|
145 |
public void setMetadataFormat(final String metadataFormat) { |
|
146 |
this.metadataFormat = metadataFormat; |
|
147 |
} |
|
148 |
|
|
149 |
/** |
|
150 |
* Gets the provider id. |
|
151 |
* |
|
152 |
* @return the providerId |
|
153 |
*/ |
|
154 |
public String getProviderId() { |
|
155 |
return providerId; |
|
156 |
} |
|
157 |
|
|
158 |
/** |
|
159 |
* Sets the provider id. |
|
160 |
* |
|
161 |
* @param providerId |
|
162 |
* the providerId to set |
|
163 |
*/ |
|
164 |
public void setProviderId(final String providerId) { |
|
165 |
this.providerId = providerId; |
|
166 |
} |
|
167 |
|
|
168 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/objectStore/UpdateObjectStoreSizeJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.nodes.objectStore; |
|
2 |
|
|
3 |
import javax.annotation.Resource; |
|
4 |
|
|
5 |
import com.googlecode.sarasvati.Arc; |
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
|
|
8 |
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService; |
|
9 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
10 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
11 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
12 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
13 |
|
|
14 |
public class UpdateObjectStoreSizeJobNode extends SimpleJobNode { |
|
15 |
|
|
16 |
/** The obejct store id. */ |
|
17 |
private String obejctStoreIdParam; |
|
18 |
|
|
19 |
/** The service locator. */ |
|
20 |
@Resource |
|
21 |
private UniqueServiceLocator serviceLocator; |
|
22 |
|
|
23 |
@Override |
|
24 |
protected String execute(final NodeToken token) throws Exception { |
|
25 |
|
|
26 |
final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class); |
|
27 |
|
|
28 |
int size = serviceLocator.getService(ObjectStoreService.class, obejctStoreIdParam).getSize(obejctStoreIdParam); |
|
29 |
|
|
30 |
String now = DateUtils.now_ISO8601(); |
|
31 |
|
|
32 |
String mdstoreXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + obejctStoreIdParam + "']" |
|
33 |
+ "return update value $x//LAST_STORAGE_DATE with '" + now + "'"; |
|
34 |
|
|
35 |
registry.executeXUpdate(mdstoreXUpdate); |
|
36 |
|
|
37 |
String mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + obejctStoreIdParam + "']" |
|
38 |
+ "return update value $x//COUNT_STORE with '" + size + "'"; |
|
39 |
|
|
40 |
registry.executeXUpdate(mdstoreNumberXUpdate); |
|
41 |
|
|
42 |
mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + obejctStoreIdParam + "']" |
|
43 |
+ "return update value $x//STORE_SIZE with '" + size + "'"; |
|
44 |
|
|
45 |
registry.executeXUpdate(mdstoreNumberXUpdate); |
|
46 |
|
|
47 |
return Arc.DEFAULT_ARC; |
|
48 |
} |
|
49 |
|
|
50 |
/** |
|
51 |
* @return the obejctStoreIdParam |
|
52 |
*/ |
|
53 |
public String getObejctStoreIdParam() { |
|
54 |
return obejctStoreIdParam; |
|
55 |
} |
|
56 |
|
|
57 |
/** |
|
58 |
* @param obejctStoreIdParam |
|
59 |
* the obejctStoreIdParam to set |
|
60 |
*/ |
|
61 |
public void setObejctStoreIdParam(final String obejctStoreIdParam) { |
|
62 |
this.obejctStoreIdParam = obejctStoreIdParam; |
|
63 |
} |
|
64 |
|
|
65 |
} |
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/objectStore/DownloadIntoObjectStoreJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workflows.nodes.objectStore; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.Iterator; |
|
5 |
import java.util.Map; |
|
6 |
|
|
7 |
import javax.xml.ws.wsaddressing.W3CEndpointReference; |
|
8 |
import javax.xml.xpath.XPath; |
|
9 |
import javax.xml.xpath.XPathFactory; |
|
10 |
|
|
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 |
import org.xml.sax.InputSource; |
|
14 |
|
|
15 |
import com.googlecode.sarasvati.Engine; |
|
16 |
import com.googlecode.sarasvati.NodeToken; |
|
17 |
import com.googlecode.sarasvati.env.Env; |
|
18 |
|
|
19 |
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord; |
|
20 |
import eu.dnetlib.enabling.resultset.IterableResultSetFactory; |
|
21 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
22 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
23 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
|
24 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
|
25 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
26 |
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory; |
|
27 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
28 |
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; |
|
29 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
30 |
|
|
31 |
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode { |
|
32 |
|
|
33 |
private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class); |
|
34 |
|
|
35 |
class MetadataObjectIterator implements Iterator<String> { |
|
36 |
|
|
37 |
private Iterator<String> inputIterator; |
|
38 |
|
|
39 |
private String mime; |
|
40 |
|
|
41 |
public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) { |
|
42 |
this.inputIterator = inputIterator; |
|
43 |
} |
|
44 |
|
|
45 |
@Override |
|
46 |
public boolean hasNext() { |
|
47 |
return inputIterator.hasNext(); |
|
48 |
} |
|
49 |
|
|
50 |
@Override |
|
51 |
public String next() { |
|
52 |
try { |
|
53 |
String record = inputIterator.next(); |
|
54 |
XPath xpath = XPathFactory.newInstance().newXPath(); |
|
55 |
InputSource doc = new InputSource(new StringReader(record)); |
|
56 |
String identifier = xpath.evaluate(getIdXpath(), doc); |
|
57 |
MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime); |
|
58 |
return objectrecord.toJSON(); |
|
59 |
} catch (Exception e) { |
|
60 |
return null; |
|
61 |
} |
|
62 |
} |
|
63 |
|
|
64 |
@Override |
|
65 |
public void remove() { |
|
66 |
|
|
67 |
} |
|
68 |
|
|
69 |
} |
|
70 |
|
|
71 |
private String eprParam; |
|
72 |
|
|
73 |
private String objectStoreId; |
|
74 |
|
|
75 |
private String idXpath; // "//*[local-name()='objIdentifier'] |
|
76 |
|
|
77 |
private String contentDescription; |
|
78 |
|
|
79 |
private String objectIsInsideEpr; |
|
80 |
|
|
81 |
private IterableResultSetFactory iterableResultSetFactory; |
|
82 |
private ResultSetClientFactory resultSetClientFactory; |
|
83 |
|
|
84 |
private ResultsetProgressProvider progressProvider; |
|
85 |
private ProcessCountingResultSetFactory processCountingResultSetFactory; |
|
86 |
|
|
87 |
public String getEprParam() { |
|
88 |
return eprParam; |
|
89 |
} |
|
90 |
|
|
91 |
public void setEprParam(final String eprParam) { |
|
92 |
this.eprParam = eprParam; |
|
93 |
} |
|
94 |
|
|
95 |
public String getObjectStoreId() { |
|
96 |
return objectStoreId; |
|
97 |
} |
|
98 |
|
|
99 |
public void setObjectStoreId(final String objectStoreId) { |
|
100 |
this.objectStoreId = objectStoreId; |
|
101 |
} |
|
102 |
|
|
103 |
public ProgressProvider getProgressProvider(final NodeToken token) { |
|
104 |
|
|
105 |
return progressProvider; |
|
106 |
} |
|
107 |
|
|
108 |
@Override |
|
109 |
protected String getXqueryForServiceId(final NodeToken token) { |
|
110 |
return "//RESOURCE_IDENTIFIER[../RESOURCE_TYPE/@value='ObjectStoreServiceResourceType']/@value/string()"; |
|
111 |
} |
|
112 |
|
|
113 |
@Override |
|
114 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
115 |
|
|
116 |
job.setAction("FEEDOBJECT"); |
|
117 |
final String eprS = token.getEnv().getAttribute(getEprParam()); |
|
118 |
job.getParameters().put("obsID", getObjectStoreId()); |
|
119 |
job.getParameters().put("mime", getContentDescription()); |
|
120 |
final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator(); |
|
121 |
|
|
122 |
final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() { |
|
123 |
|
|
124 |
@Override |
|
125 |
public Iterator<String> iterator() { |
|
126 |
return new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml"); |
|
127 |
} |
|
128 |
}); |
|
129 |
this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr); |
|
130 |
job.getParameters().put("epr", progressProvider.getEpr().toString()); |
|
131 |
|
|
132 |
} |
|
133 |
|
|
134 |
@Override |
|
135 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
136 |
return new BlackboardWorkflowJobListener(engine, token) { |
|
137 |
|
|
138 |
@Override |
|
139 |
protected void populateEnv(final Env env, final Map<String, String> responseParams) { |
|
140 |
log.info("Number of stored records: " + responseParams.get("total")); |
|
141 |
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total")); |
|
142 |
} |
|
143 |
}; |
|
144 |
} |
|
145 |
|
|
146 |
public String getObjectIsInsideEpr() { |
|
147 |
return objectIsInsideEpr; |
|
148 |
} |
|
149 |
|
|
150 |
public void setObjectIsInsideEpr(final String objectIsInsideEpr) { |
|
151 |
this.objectIsInsideEpr = objectIsInsideEpr; |
|
152 |
} |
|
153 |
|
|
154 |
@Override |
|
155 |
public ResultsetProgressProvider getProgressProvider() { |
|
156 |
return progressProvider; |
|
157 |
} |
|
158 |
|
|
159 |
public void setProgressProvider(final ResultsetProgressProvider progressProvider) { |
|
160 |
this.progressProvider = progressProvider; |
|
161 |
} |
|
162 |
|
|
163 |
public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() { |
|
164 |
return processCountingResultSetFactory; |
|
165 |
} |
|
166 |
|
|
167 |
public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) { |
|
168 |
this.processCountingResultSetFactory = processCountingResultSetFactory; |
|
169 |
} |
|
170 |
|
|
171 |
public ResultSetClientFactory getResultSetClientFactory() { |
|
172 |
return resultSetClientFactory; |
|
173 |
} |
|
174 |
|
|
175 |
public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) { |
|
176 |
this.resultSetClientFactory = resultSetClientFactory; |
|
177 |
} |
|
178 |
|
|
179 |
public IterableResultSetFactory getIterableResultSetFactory() { |
|
180 |
return iterableResultSetFactory; |
|
181 |
} |
|
182 |
|
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-msro-service-2.0.3