Project

General

Profile

« Previous | Next » 

Revision 32748

[maven-release-plugin] copy for tag dnet-msro-service-2.0.3

View differences:

modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/deploy.info
1
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-msro-service/trunk/", "deploy_repository": "dnet4-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", "name": "dnet-msro-service"}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/test/java/eu/dnetlib/AppTest.java
1
package eu.dnetlib;
2

  
3
import java.io.StringWriter;
4

  
5
import org.junit.Test;
6
import org.w3c.dom.Node;
7
import org.w3c.dom.NodeList;
8

  
9
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
10

  
11
/**
12
 * Unit test for simple App.
13
 */
14
public class AppTest {
15

  
16
	@Test
17
	public void testXslt() {
18
		String res = (new ApplyXslt(getXslt())).evaluate(getXml());
19
		System.out.println("******************");
20
		System.out.println(res);
21
		System.out.println("******************");
22
	}
23

  
24
	
25
	public static String javaMethod(NodeList list) {
26
		System.out.println("******************");
27
		System.out.println("TYPE   : " + list.toString());
28
		System.out.println("LENGTH : " + list.getLength());
29
		for (int i = 0; i<list.getLength(); i++) {
30
			Node node = list.item(i);
31
			
32
			System.out.println("ELEM " + i + ": " + node.getLocalName());
33
		}
34
		
35
		System.out.println("******************");
36
		return "SUCA";
37
	}
38
	
39

  
40
	private String getXml() {
41
		final StringWriter sw = new StringWriter();
42
		sw.append("<?xml version='1.0' encoding='UTF-8'?>\n");
43
		sw.append("<record>\n");
44
		sw.append("<metadata>\n");
45
		sw.append("<a>A text value</a>\n");
46
		sw.append("<b attr='attribute value'/>\n");
47
		sw.append("</metadata>\n");
48
		sw.append("</record>\n");
49

  
50
		return sw.toString();
51
	}
52

  
53
	private String getXslt() {
54
		final StringWriter sw = new StringWriter();
55

  
56
		sw.append("<?xml version='1.0' encoding='UTF-8'?>\n");
57
		sw.append("<xsl:stylesheet version='1.0' xmlns:xsl='http://www.w3.org/1999/XSL/Transform' xmlns:dnet='eu.dnetlib.AppTest' exclude-result-prefixes='xsl dnet'>\n");
58
		sw.append("<xsl:output omit-xml-declaration='yes' indent='yes'/>\n");
59
		sw.append("<xsl:template match='/*'>\n");
60
		sw.append("<xsl:variable name='metadata' select=\"//*[local-name()='metadata']/*\" />\n");
61
		sw.append("<ROWS>\n");
62
		sw.append("<xsl:value-of select='dnet:javaMethod($metadata)'/>\n");
63
		sw.append("</ROWS>\n");
64
		sw.append("</xsl:template>\n");
65
		sw.append("</xsl:stylesheet>\n");
66
		return sw.toString();
67
	}
68
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/cron/ScheduledWorkflowLauncher.java
1
package eu.dnetlib.msro.cron;
2

  
3
import java.util.Date;
4

  
5
import javax.annotation.Resource;
6

  
7
import org.apache.commons.lang.StringUtils;
8
import org.apache.commons.lang.math.NumberUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.quartz.CronExpression;
12
import org.springframework.beans.factory.annotation.Required;
13

  
14
import com.googlecode.sarasvati.GraphProcess;
15

  
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
19
import eu.dnetlib.miscutils.datetime.DateUtils;
20
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
21
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
22

  
23
public class ScheduledWorkflowLauncher {
24

  
25
	private static final Log log = LogFactory.getLog(ScheduledWorkflowLauncher.class);
26

  
27
	private static final DateUtils dateUtils = new DateUtils();
28

  
29
	private WorkflowExecutor workflowExecutor;
30

  
31
	private GraphProcessRegistry graphProcessRegistry;
32

  
33
	private int windowSize = 7200000; // 2 hours
34

  
35
	@Resource
36
	private UniqueServiceLocator serviceLocator;
37

  
38
	public void verifySheduledWorkflows() {
39
		log.debug("Verifying scheduled workflows - START");
40

  
41
		final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') " +
42
				"where $x//CONFIGURATION/@status='EXECUTABLE' " +
43
				"and $x//SCHEDULING/@enabled='true' " +
44
				"return concat($x//RESOURCE_IDENTIFIER/@value, ' @@@ ', $x//SCHEDULING/CRON, ' @@@ ', $x//SCHEDULING/MININTERVAL)";
45

  
46
		try {
47
			for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
48
				final String[] arr = s.split("@@@");
49
				final String id = arr[0].trim();
50
				final String cron = arr[1].trim();
51
				final int minInterval = NumberUtils.toInt(arr[2].trim(), 0) * 60000; // MINUTES to MILLIS
52

  
53
				if (CronExpression.isValidExpression(cron)) {
54
					final Date now = new Date();
55
					final Date last = calculateLastExecutionDate(id);
56
					final int elapsed = Math.abs(Math.round(now.getTime() - last.getTime()));
57

  
58
					if (log.isDebugEnabled()) {
59
						log.debug("**************************************************************");
60
						log.debug("META WORKFLOW ID   : " + id);
61
						log.debug("NOW                : " + now);
62
						log.debug("LAST EXECUTION DATE: " + last);
63
						log.debug("MIN INTERVAL       : " + minInterval);
64
						log.debug("WINDOW SIZE        : " + windowSize);
65
						log.debug("TIME ELAPSED       : " + elapsed);
66
					}
67

  
68
					if (elapsed > minInterval && isFired(cron, last, now) && !isAlreadyRunning(id)) {
69
						log.debug("MUST BE EXECUTED   : true");
70
						try {
71
							workflowExecutor.startMetaWorkflow(id, false);
72
						} catch (Exception e) {
73
							log.error("Error launching scheduled wf: " + id, e);
74
						}
75
					} else {
76
						log.debug("MUST BE EXECUTED   : false");
77
					}
78
					log.debug("**************************************************************");
79
				}
80
			}
81
		} catch (ISLookUpException e) {
82
			log.error("Error executing query " + query);
83
		}
84

  
85
		log.debug("Verifying scheduled workflows - END");
86
	}
87

  
88
	private boolean isFired(final String cronExpression, final Date startDate, final Date now) {
89
		try {
90
			final CronExpression cron = new CronExpression(cronExpression);
91

  
92
			final Date prev = new Date(now.getTime() - windowSize);
93
			final Date date = prev.getTime() < startDate.getTime() ? startDate : prev;
94
			final Date next = cron.getNextValidTimeAfter(date);
95

  
96
			if (log.isDebugEnabled()) {
97
				log.debug("NEXT EXECUTION DATE: " + next);
98
				log.debug("FIRED              : " + (next.getTime() < now.getTime()));
99
			}
100
			return next.getTime() < now.getTime();
101
		} catch (Exception e) {
102
			log.error("Error calculating next cron event: " + cronExpression, e);
103
			return false;
104
		}
105
	}
106

  
107
	private boolean isAlreadyRunning(final String metaWfId) {
108
		final String query = "document('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(metaWfId, "_")
109
				+ "')//WORKFLOW/@id/string()";
110

  
111
		try {
112
			for (String profileId : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
113
				if (profileId.length() > 0) {
114
					for (GraphProcess p : graphProcessRegistry.findProcessesByResource(profileId)) {
115
						switch (p.getState()) {
116
						case Created:
117
							return true;
118
						case Executing:
119
							return true;
120
						default:
121
							break;
122
						}
123
					}
124
				}
125
			}
126
		} catch (ISLookUpException e) {
127
			log.error("Error executing query " + query);
128
		}
129
		return false;
130
	}
131

  
132
	private Date calculateLastExecutionDate(final String id) {
133
		final String query = "for $id in document('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType/" + StringUtils.substringBefore(id, "_")
134
				+ "')//WORKFLOW/@id/string() " +
135
				"for $x in document(concat('/db/DRIVER/WorkflowDSResources/WorkflowDSResourceType/', substring-before($id, '_'))) " +
136
				"where $x//LAST_EXECUTION_STATUS = 'SUCCESS' " +
137
				"return $x//LAST_EXECUTION_DATE/text() ";
138

  
139
		long time = 0;
140
		try {
141
			for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
142
				if (s.length() > 0) {
143
					final Date d = dateUtils.parse(s);
144
					if (time < d.getTime()) {
145
						time = d.getTime();
146
					}
147
				}
148
			}
149
		} catch (ISLookUpException e) {
150
			log.error("Error executing query " + query);
151
		} catch (Exception e) {
152
			log.error("Error calculating date", e);
153
		}
154
		return new Date(time);
155
	}
156

  
157
	public WorkflowExecutor getWorkflowExecutor() {
158
		return workflowExecutor;
159
	}
160

  
161
	@Required
162
	public void setWorkflowExecutor(final WorkflowExecutor workflowExecutor) {
163
		this.workflowExecutor = workflowExecutor;
164
	}
165

  
166
	public GraphProcessRegistry getGraphProcessRegistry() {
167
		return graphProcessRegistry;
168
	}
169

  
170
	@Required
171
	public void setGraphProcessRegistry(final GraphProcessRegistry graphProcessRegistry) {
172
		this.graphProcessRegistry = graphProcessRegistry;
173
	}
174

  
175
	public int getWindowSize() {
176
		return windowSize;
177
	}
178

  
179
	@Required
180
	public void setWindowSize(final int windowSize) {
181
		this.windowSize = windowSize;
182
	}
183

  
184
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/MSROServiceImpl.java
1
package eu.dnetlib.msro;
2

  
3
import javax.jws.WebService;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8

  
9
import eu.dnetlib.enabling.tools.AbstractBaseService;
10
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
11
import eu.dnetlib.msro.rmi.MSROService;
12
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
13

  
14
@WebService(targetNamespace = "http://services.dnetlib.eu/")
15
public class MSROServiceImpl extends AbstractBaseService implements MSROService {
16

  
17
	/**
18
	 * logger.
19
	 */
20
	private static final Log log = LogFactory.getLog(MSROServiceImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
21

  
22
	/**
23
	 * notification handler.
24
	 */
25
	private NotificationHandler notificationHandler;
26

  
27
	/**
28
	 * graph process registry.
29
	 */
30
	private GraphProcessRegistry processRegistry;
31

  
32
	@Override
33
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
34
		super.notify(subscriptionId, topic, isId, message);
35

  
36
		log.debug("got notification: " + topic);
37

  
38
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
39
	}
40

  
41
	@Required
42
	public void setNotificationHandler(final NotificationHandler notHandler) {
43
		this.notificationHandler = notHandler;
44
	}
45

  
46
	public NotificationHandler getNotificationHandler() {
47
		return notificationHandler;
48
	}
49

  
50
	@Required
51
	public void setProcessRegistry(final GraphProcessRegistry processRegistry) {
52
		this.processRegistry = processRegistry;
53
	}
54

  
55
	public GraphProcessRegistry getProcessRegistry() {
56
		return processRegistry;
57
	}
58

  
59
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/notification/WfDependencyLauncherNotificationHandler.java
1
package eu.dnetlib.msro.notification;
2

  
3
import java.io.StringReader;
4
import java.util.Map;
5
import java.util.Set;
6

  
7
import javax.annotation.Resource;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.dom4j.Document;
12
import org.dom4j.DocumentException;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15

  
16
import com.google.common.base.Splitter;
17
import com.google.common.collect.Maps;
18
import com.google.common.collect.Sets;
19

  
20
import eu.dnetlib.enabling.actions.AbstractSubscriptionAction;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.msro.workflows.sarasvati.loader.WorkflowExecutor;
25

  
26
public class WfDependencyLauncherNotificationHandler extends AbstractSubscriptionAction {
27

  
28
	@Resource
29
	private UniqueServiceLocator serviceLocator;
30

  
31
	@Resource
32
	private WorkflowExecutor workflowExecutor;
33

  
34
	@Resource
35
	private EmailDispatcher emailDispatcher;
36

  
37
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
38

  
39
	@Override
40
	public void notified(final String subscrId, final String topic, final String rsId, final String profile) {
41

  
42
		final SAXReader reader = new SAXReader();
43
		try {
44
			final Document doc = reader.read(new StringReader(profile));
45

  
46
			final Set<String> emails = calculateEmails(rsId);
47
			final String procId = doc.valueOf("//LAST_EXECUTION_ID");
48
			final String wfName = doc.valueOf("//WORKFLOW_NAME");
49
			final boolean success = doc.valueOf("//LAST_EXECUTION_STATUS").equals("SUCCESS");
50
			final Map<String, String> responses = Maps.newHashMap();
51

  
52
			for (Object o : doc.selectNodes("//LAST_EXECUTION_OUTPUT")) {
53
				Node n = (Node) o;
54
				responses.put(n.valueOf("@name"), n.getText());
55
			}
56

  
57
			if (!success) {
58
				log.info("Last execution of " + rsId + " failed, dependencies NOT STARTED");
59
			}
60

  
61
			final String query = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')//WORKFLOW[@id='" + rsId
62
					+ "']/WORKFLOW let $y := /RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = $x/@id] "
63
					+ "where $y//CONFIGURATION/@start != 'disabled' return concat ($x/@id , ' @@@ ', $x/@name , ' @@@ ', $y//CONFIGURATION/@start)";
64

  
65
			try {
66
				final Map<String, String> pendingWfs = Maps.newHashMap();
67

  
68
				for (String s : serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query)) {
69
					final String[] arr = s.split("@@@");
70
					final String id = arr[0].trim();
71
					final String name = arr[1].trim();
72
					final boolean manual = arr[2].trim().toLowerCase().equals("manual");
73
					if (success && !manual) {
74
						try {
75
							String pid = workflowExecutor.startProcess(id);
76
							log.info("PROC " + pid + " of WF " + id + " STARTED AS CHILD OF " + rsId);
77
						} catch (Exception e) {
78
							log.error("Error starting wf: " + id);
79
						}
80
					} else {
81
						pendingWfs.put(id, name);
82
					}
83
				}
84

  
85
				if (!emails.isEmpty()) {
86
					if (success) {
87
						emailDispatcher.sendSuccessMail(emails, rsId, procId, wfName, pendingWfs, responses);
88
					} else {
89
						final String error = doc.valueOf("//LAST_EXECUTION_ERROR");
90
						emailDispatcher.sendFailedMail(emails, rsId, procId, wfName, pendingWfs, responses, error);
91
					}
92
				}
93
			} catch (ISLookUpException e) {
94
				log.error("Error executing xquery: " + query, e);
95
			}
96
		} catch (DocumentException e) {
97
			log.error("Error parsing profile with id " + rsId + ": " + profile);
98
		}
99
	}
100

  
101
	private Set<String> calculateEmails(final String id) {
102
		final Set<String> list = Sets.newHashSet();
103
		try {
104
			for (String val : serviceLocator.getService(ISLookUpService.class).quickSearchProfile("//ADMIN_EMAIL[..//WORKFLOW/@id='" + id + "']/text()")) {
105
				for (String s : Splitter.on(",").trimResults().omitEmptyStrings().split(val)) {
106
					list.add(s);
107
				}
108
			}
109
		} catch (Exception e) {
110
			log.error("Error searching email adresses", e);
111
		}
112
		return list;
113
	}
114
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/notification/EmailDispatcher.java
1
package eu.dnetlib.msro.notification;
2

  
3
import java.util.Arrays;
4
import java.util.Date;
5
import java.util.Map;
6
import java.util.Properties;
7
import java.util.Set;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.LinkedBlockingQueue;
10

  
11
import javax.mail.Authenticator;
12
import javax.mail.Message;
13
import javax.mail.MessagingException;
14
import javax.mail.PasswordAuthentication;
15
import javax.mail.Session;
16
import javax.mail.Transport;
17
import javax.mail.internet.InternetAddress;
18
import javax.mail.internet.MimeMessage;
19

  
20
import org.antlr.stringtemplate.StringTemplate;
21
import org.apache.commons.io.IOUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.springframework.beans.factory.annotation.Required;
25

  
26
import com.google.common.base.Splitter;
27
import com.google.common.collect.Maps;
28

  
29
public class EmailDispatcher {
30

  
31
	private String from;
32
	private String fromName;
33
	private String cc;
34
	private String smtpHost;
35
	private int smtpPort = 587;
36
	private String smtpUser;
37
	private String smtpPassword;
38
	private String baseUrl;
39

  
40
	private static final Log log = LogFactory.getLog(EmailDispatcher.class);
41

  
42
	private final BlockingQueue<Message> queue = new LinkedBlockingQueue<Message>();
43

  
44
	public void sendMail(final Set<String> to, final String subject, final String template, final Map<String, Object> tmplParams) {
45
		try {
46
			final StringTemplate st = new StringTemplate(template);
47
			st.setAttributes(tmplParams);
48
			st.setAttribute("baseUrl", baseUrl);
49

  
50
			final Session session = Session.getInstance(obtainProperties(), obtainAuthenticator());
51

  
52
			final MimeMessage message = new MimeMessage(session);
53
			message.setFrom(new InternetAddress(from, fromName));
54
			message.setSubject(subject);
55
			message.setContent(st.toString(), "text/html; charset=utf-8");
56
			message.setSentDate(new Date());
57

  
58
			for (String s : to) {
59
				message.addRecipient(Message.RecipientType.TO, new InternetAddress(s));
60
			}
61
			if ((cc != null) && !cc.isEmpty()) {
62
				for (String aCC : Splitter.on(",").omitEmptyStrings().trimResults().split(getCc())) {
63
					message.addRecipient(Message.RecipientType.CC, new InternetAddress(aCC));
64
				}
65
			}
66

  
67
			queue.add(message);
68

  
69
			log.info("Mail to " + Arrays.toString(to.toArray()) + " in queue");
70
		} catch (Exception e) {
71
			log.error("Error sending mail", e);
72
		}
73
	}
74

  
75
	public void processMailQueue() {
76
		while (true) {
77
			final Message message = queue.poll();
78
			if (message == null) {
79
				return;
80
			} else {
81
				try {
82
					log.info("Sending mail...");
83
					Transport.send(message);
84
					log.info("...sent");
85
				} catch (MessagingException e) {
86
					log.error("Error sending email", e);
87
					queue.add(message);
88
					return;
89
				}
90
			}
91
		}
92
	}
93

  
94
	private void sendWfStatusMail(final boolean success,
95
			final Set<String> to,
96
			final String wfId,
97
			final String procId,
98
			final String wfName,
99
			final Map<String, String> pendingWfs,
100
			final Map<String, String> responses,
101
			final String error) {
102
		try {
103
			final Map<String, Object> map = Maps.newHashMap();
104
			map.put("wfId", wfId);
105
			map.put("wfName", wfName);
106
			map.put("procId", procId);
107
			if ((pendingWfs != null) && !pendingWfs.isEmpty()) {
108
				map.put("pendingWfs", pendingWfs);
109
			}
110
			if ((responses != null) && !responses.isEmpty()) {
111
				map.put("responses", responses);
112
			}
113
			if ((error != null) && !error.isEmpty()) {
114
				map.put("error", error);
115
			}
116

  
117
			final String subject = success ? "Workflow '" + wfName + "' has been completed successfully" : "Workflow '" + wfName + "' is failed";
118
			final String tmplName = success ? "wf_success.mail.st" : "wf_failed.mail.st";
119
			final String template = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/msro/mail/" + tmplName));
120

  
121
			sendMail(to, subject, template, map);
122
		} catch (Exception e) {
123
			log.error("Error generating success-mail", e);
124
		}
125
	}
126

  
127
	public void sendSuccessMail(final Set<String> to,
128
			final String wfId,
129
			final String procId,
130
			final String wfName,
131
			final Map<String, String> pendingWfs,
132
			final Map<String, String> responses) {
133
		sendWfStatusMail(true, to, wfId, procId, wfName, pendingWfs, responses, "");
134
	}
135

  
136
	public void sendFailedMail(final Set<String> to,
137
			final String wfId,
138
			final String procId,
139
			final String wfName,
140
			final Map<String, String> pendingWfs,
141
			final Map<String, String> responses,
142
			final String error) {
143
		sendWfStatusMail(false, to, wfId, procId, wfName, pendingWfs, responses, error);
144
	}
145

  
146
	private Properties obtainProperties() {
147
		final Properties props = new Properties();
148
		props.put("mail.transport.protocol", "smtp");
149
		props.put("mail.smtp.host", smtpHost);
150
		props.put("mail.smtp.port", smtpPort);
151
		props.put("mail.smtp.auth", Boolean.toString((smtpUser != null) && !smtpUser.isEmpty()));
152
		return props;
153
	}
154

  
155
	private Authenticator obtainAuthenticator() {
156
		if ((smtpUser == null) || smtpUser.isEmpty()) { return null; }
157

  
158
		return new Authenticator() {
159

  
160
			private final PasswordAuthentication authentication = new PasswordAuthentication(smtpUser, smtpPassword);
161

  
162
			@Override
163
			protected PasswordAuthentication getPasswordAuthentication() {
164
				return authentication;
165
			}
166

  
167
		};
168
	}
169

  
170
	public String getFrom() {
171
		return from;
172
	}
173

  
174
	@Required
175
	public void setFrom(final String from) {
176
		this.from = from;
177
	}
178

  
179
	public String getFromName() {
180
		return fromName;
181
	}
182

  
183
	@Required
184
	public void setFromName(final String fromName) {
185
		this.fromName = fromName;
186
	}
187

  
188
	public String getCc() {
189
		return cc;
190
	}
191

  
192
	@Required
193
	public void setCc(final String cc) {
194
		this.cc = cc;
195
	}
196

  
197
	public String getSmtpHost() {
198
		return smtpHost;
199
	}
200

  
201
	@Required
202
	public void setSmtpHost(final String smtpHost) {
203
		this.smtpHost = smtpHost;
204
	}
205

  
206
	public int getSmtpPort() {
207
		return smtpPort;
208
	}
209

  
210
	public void setSmtpPort(final int smtpPort) {
211
		this.smtpPort = smtpPort;
212
	}
213

  
214
	public String getSmtpUser() {
215
		return smtpUser;
216
	}
217

  
218
	public void setSmtpUser(final String smtpUser) {
219
		this.smtpUser = smtpUser;
220
	}
221

  
222
	public String getSmtpPassword() {
223
		return smtpPassword;
224
	}
225

  
226
	public void setSmtpPassword(final String smtpPassword) {
227
		this.smtpPassword = smtpPassword;
228
	}
229

  
230
	public String getBaseUrl() {
231
		return baseUrl;
232
	}
233

  
234
	@Required
235
	public void setBaseUrl(final String baseUrl) {
236
		this.baseUrl = baseUrl;
237
	}
238

  
239
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/DatasourceMetaWorkflowFactory.java
1
package eu.dnetlib.msro.workflows.metawf;
2

  
3
import org.springframework.context.ApplicationContext;
4
import org.springframework.context.ApplicationContextAware;
5

  
6
public class DatasourceMetaWorkflowFactory implements ApplicationContextAware {
7

  
8
	private transient ApplicationContext applicationContext;
9

  
10
	public DatasourceMetaWorkflow newMetaWorkflow(final String beanName) {
11
		final DatasourceMetaWorkflow prototypeMetaWf = (DatasourceMetaWorkflow) applicationContext.getBean(beanName, DatasourceMetaWorkflow.class);
12

  
13
		if (prototypeMetaWf != null) {
14
			return prototypeMetaWf;
15
		} else {
16
			throw new IllegalArgumentException("cannot find bean " + beanName);
17
		}
18
	}
19

  
20
	@Override
21
	public void setApplicationContext(final ApplicationContext context) {
22
		this.applicationContext = context;
23
	}
24

  
25
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/WorkflowTree.java
1
package eu.dnetlib.msro.workflows.metawf;
2

  
3
import java.io.IOException;
4
import java.io.StringWriter;
5
import java.util.List;
6
import java.util.Map;
7

  
8
import javax.annotation.Resource;
9

  
10
import org.apache.commons.lang.StringEscapeUtils;
11
import org.springframework.beans.factory.annotation.Required;
12

  
13
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
14
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
15
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
16

  
17
public class WorkflowTree {
18

  
19
	private String id;
20

  
21
	private String name;
22

  
23
	private String start = "auto";
24

  
25
	private List<WorkflowTree> children;
26

  
27
	private org.springframework.core.io.Resource template;
28

  
29
	@Resource
30
	private UniqueServiceLocator serviceLocator;
31

  
32
	public void populateMetaWfXml(final StringWriter sw) {
33
		sw.append("<WORKFLOW id='" + StringEscapeUtils.escapeXml(id) + "' name='" + StringEscapeUtils.escapeXml(name) + "'");
34

  
35
		if (children == null || children.isEmpty()) {
36
			sw.append(" />");
37
		} else {
38
			sw.append(">");
39
			for (WorkflowTree child : children) {
40
				child.populateMetaWfXml(sw);
41
			}
42
			sw.append("</WORKFLOW>");
43
		}
44
	}
45

  
46
	public int registerAllWorkflows(final Map<String, String> params) throws ISRegistryException, IOException {
47
		int count = 0;
48

  
49
		if (this.id == null || this.id.isEmpty()) {
50
			final String profile = WorkflowProfileCreator.generateProfile(name, "aggregator", params, template);
51
			this.id = serviceLocator.getService(ISRegistryService.class).registerProfile(profile);
52
			count++;
53
		}
54

  
55
		if (children != null) {
56
			for (WorkflowTree child : children) {
57
				count += child.registerAllWorkflows(params);
58
			}
59
		}
60
		return count;
61
	}
62

  
63
	public String getId() {
64
		return id;
65
	}
66

  
67
	public List<WorkflowTree> getChildren() {
68
		return children;
69
	}
70

  
71
	public void setChildren(final List<WorkflowTree> children) {
72
		this.children = children;
73
	}
74

  
75
	public String getName() {
76
		return name;
77
	}
78

  
79
	@Required
80
	public void setName(final String name) {
81
		this.name = name;
82
	}
83

  
84
	public org.springframework.core.io.Resource getTemplate() {
85
		return template;
86
	}
87

  
88
	@Required
89
	public void setTemplate(final org.springframework.core.io.Resource template) {
90
		this.template = template;
91
	}
92

  
93
	public String getStart() {
94
		return start;
95
	}
96

  
97
	public void setStart(final String start) {
98
		this.start = start;
99
	}
100

  
101
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/WorkflowProfileCreator.java
1
package eu.dnetlib.msro.workflows.metawf;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.antlr.stringtemplate.StringTemplate;
7
import org.apache.commons.io.IOUtils;
8
import org.springframework.core.io.ClassPathResource;
9
import org.springframework.core.io.Resource;
10

  
11
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
12

  
13
public class WorkflowProfileCreator {
14

  
15
	private static final Resource wfTemplate = new ClassPathResource("/eu/dnetlib/msro/workflows/templates/workflow.xml.st");
16

  
17
	public static String generateProfile(final String name, final String type, final Map<String, String> params, final Resource confTemplate)
18
			throws IOException {
19

  
20
		final StringTemplate conf = new StringTemplate(IOUtils.toString(confTemplate.getInputStream()));
21
		conf.setAttribute("params", params);
22

  
23
		final StringTemplate profile = new StringTemplate(IOUtils.toString(wfTemplate.getInputStream()));
24
		profile.setAttribute("name", name);
25
		profile.setAttribute("type", type);
26
		profile.setAttribute("priority", WorkflowsConstants.DEFAULT_WF_PRIORITY);
27
		profile.setAttribute("conf", conf.toString());
28

  
29
		return profile.toString();
30
	}
31
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/metawf/DatasourceMetaWorkflow.java
1
package eu.dnetlib.msro.workflows.metawf;
2

  
3
import java.io.IOException;
4
import java.io.StringWriter;
5
import java.util.Map;
6

  
7
import org.springframework.beans.factory.annotation.Required;
8
import org.springframework.core.io.Resource;
9

  
10
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
11
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13

  
14
public class DatasourceMetaWorkflow {
15

  
16
	private WorkflowTree tree;
17

  
18
	private Resource destroyWorkflowTemplate;
19

  
20
	@javax.annotation.Resource
21
	private UniqueServiceLocator serviceLocator;
22

  
23
	public int registerAllWorkflows(final Map<String, String> params) throws ISRegistryException, IOException {
24
		return tree.registerAllWorkflows(params);
25
	}
26

  
27
	public String registerDestroyWorkflow(final Map<String, String> params) throws ISRegistryException, IOException {
28
		final String profile = WorkflowProfileCreator.generateProfile("Repo BYE", "REPO_BYE", params, destroyWorkflowTemplate);
29
		return serviceLocator.getService(ISRegistryService.class).registerProfile(profile);
30
	}
31

  
32
	public String asXML() {
33
		final StringWriter sw = new StringWriter();
34
		tree.populateMetaWfXml(sw);
35
		return sw.toString();
36
	}
37

  
38
	public WorkflowTree getTree() {
39
		return tree;
40
	}
41

  
42
	@Required
43
	public void setTree(final WorkflowTree tree) {
44
		this.tree = tree;
45
	}
46

  
47
	public Resource getDestroyWorkflowTemplate() {
48
		return destroyWorkflowTemplate;
49
	}
50

  
51
	@Required
52
	public void setDestroyWorkflowTemplate(final Resource destroyWorkflowTemplate) {
53
		this.destroyWorkflowTemplate = destroyWorkflowTemplate;
54
	}
55

  
56
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/unpack/UnpackJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.unpack;
2

  
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Queue;
6
import java.util.concurrent.PriorityBlockingQueue;
7

  
8
import javax.xml.ws.wsaddressing.W3CEndpointReference;
9

  
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.dom4j.Document;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15
import org.springframework.beans.factory.annotation.Required;
16

  
17
import com.googlecode.sarasvati.Arc;
18
import com.googlecode.sarasvati.NodeToken;
19

  
20
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
21
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
22
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
23

  
24
public class UnpackJobNode extends SimpleJobNode {
25

  
26
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(UnpackJobNode.class);
30

  
31
	private String inputEprParam;
32
	private String outputEprParam;
33
	private String xpath;
34

  
35
	private IterableResultSetFactory iterableResultSetFactory;
36
	private ResultSetClientFactory resultSetClientFactory;
37

  
38
	@Override
39
	protected String execute(final NodeToken token) throws Exception {
40
		final Iterator<String> client = resultSetClientFactory.getClient(token.getEnv().getAttribute(inputEprParam)).iterator();
41
		final Queue<String> queue = new PriorityBlockingQueue<String>();
42

  
43
		if (client.hasNext()) {
44
			populateQueue(queue, client.next(), xpath);
45
		}
46

  
47
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() {
48

  
49
			@Override
50
			public Iterator<String> iterator() {
51
				return new Iterator<String>() {
52

  
53
					@Override
54
					public boolean hasNext() {
55
						synchronized (queue) {
56
							return !queue.isEmpty();
57
						}
58
					}
59

  
60
					@Override
61
					public String next() {
62
						synchronized (queue) {
63
							final String res = queue.poll();
64
							while (queue.isEmpty() && client.hasNext()) {
65
								populateQueue(queue, client.next(), xpath);
66
							}
67
							return res;
68
						}
69
					}
70

  
71
					@Override
72
					public void remove() {}
73
				};
74
			}
75
		});
76

  
77
		token.getEnv().setAttribute(outputEprParam, epr.toString());
78

  
79
		return Arc.DEFAULT_ARC;
80
	}
81

  
82
	private void populateQueue(final Queue<String> queue, final String record, final String xpath) {
83
		try {
84
			final SAXReader reader = new SAXReader();
85
			final Document doc = reader.read(new StringReader(record));
86
			for (Object o : doc.selectNodes(xpath)) {
87
				queue.add(((Node) o).asXML());
88
			}
89
		} catch (Exception e) {
90
			log.error("Error unpacking record: \n" + record, e);
91
			throw new RuntimeException(e);
92
		}
93
	}
94

  
95
	public IterableResultSetFactory getIterableResultSetFactory() {
96
		return iterableResultSetFactory;
97
	}
98

  
99
	@Required
100
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
101
		this.iterableResultSetFactory = iterableResultSetFactory;
102
	}
103

  
104
	public ResultSetClientFactory getResultSetClientFactory() {
105
		return resultSetClientFactory;
106
	}
107

  
108
	@Required
109
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
110
		this.resultSetClientFactory = resultSetClientFactory;
111
	}
112

  
113
	public String getInputEprParam() {
114
		return inputEprParam;
115
	}
116

  
117
	public void setInputEprParam(final String inputEprParam) {
118
		this.inputEprParam = inputEprParam;
119
	}
120

  
121
	public String getOutputEprParam() {
122
		return outputEprParam;
123
	}
124

  
125
	public void setOutputEprParam(final String outputEprParam) {
126
		this.outputEprParam = outputEprParam;
127
	}
128

  
129
	public String getXpath() {
130
		return xpath;
131
	}
132

  
133
	public void setXpath(final String xpath) {
134
		this.xpath = xpath;
135
	}
136

  
137
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/objectStore/RetrieveMdStoreId.java
1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

  
3
import java.util.List;
4
import java.util.Set;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.springframework.beans.factory.annotation.Required;
9

  
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import com.google.gson.Gson;
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

  
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
19
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
20

  
21
/**
22
 * The Class RetrieveMdStoreId is a job node used to retrieve the correct MDStore from which extract the url of the file to download.
23
 * metadata format and interpretation are injected as properties
24
 */
25
public class RetrieveMdStoreId extends SimpleJobNode {
26

  
27
	/** The metadata format. */
28
	private String metadataFormat;
29

  
30
	/** The interpretation. */
31
	private String interpretation;
32

  
33
	/** The provider id. */
34
	private String providerId;
35

  
36
	/** The service locator. */
37
	@Resource
38
	private UniqueServiceLocator serviceLocator;
39

  
40
	/*
41
	 * (non-Javadoc)
42
	 * 
43
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
44
	 */
45
	@Override
46
	protected String execute(final NodeToken token) throws Exception {
47

  
48
		String workflowQuery = "for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') where($x//DATAPROVIDER/@id='%s') return  distinct-values($x//WORKFLOW/@id/string())";
49

  
50
		List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(workflowQuery, providerId));
51
		if (result.size() == 0) { throw new RuntimeException("there is no mdStore Associated to the provider " + token.getEnv().getAttribute(getProviderId())); }
52
		Set<String> workflowIds = Sets.newHashSet(result);
53

  
54
		Set<String> metadataIds = getMdStores(workflowIds);
55
		Gson g = new Gson();
56
		token.getEnv().setAttribute("mdId", g.toJson(metadataIds));
57

  
58
		token.getEnv().setAttribute("mdFormat", getMetadataFormat());
59
		return Arc.DEFAULT_ARC;
60
	}
61

  
62
	private Set<String> getMdStores(final Set<String> workflowsId) {
63
		try {
64

  
65
			String query = "//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']//PARAM[./@category/string()='MDSTORE_ID']/text()";
66

  
67
			Set<String> mdStores = Sets.newHashSet();
68

  
69
			if (workflowsId == null) { return null; }
70

  
71
			for (String workflowId : workflowsId) {
72
				List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, workflowId));
73
				Set<String> metadataIds = Sets.newHashSet(result);
74
				mdStores.addAll(getRightMetadataId(Lists.newArrayList(metadataIds)));
75
			}
76
			return mdStores;
77

  
78
		} catch (ISLookUpException e) {
79

  
80
			return null;
81
		}
82
	}
83

  
84
	/**
85
	 * Gets the right metadata id whith the format metadataFormat and interpretation interpretation
86
	 * 
87
	 * @return the right metadata id
88
	 * @throws ISLookUpException
89
	 */
90
	private Set<String> getRightMetadataId(final Iterable<String> ids) throws ISLookUpException {
91
		String query = "concat(//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value=\"%s\"]//METADATA_FORMAT/text(), \"::<<>>::\""
92
				+ ",//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value=\"%s\"]//METADATA_FORMAT_INTERPRETATION/text())";
93
		Set<String> result = Sets.newHashSet();
94

  
95
		for (String id : ids) {
96

  
97
			List<String> results = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, id, id));
98
			if (results.size() > 0) {
99
				String[] values = results.get(0).split("::<<>>::");
100
				if (metadataFormat.equals(values[0]) && interpretation.equals(values[1])) {
101
					result.add(id);
102
				}
103
			}
104
		}
105
		return result;
106

  
107
	}
108

  
109
	/**
110
	 * Gets the interpretation.
111
	 * 
112
	 * @return the interpretation
113
	 */
114
	public String getInterpretation() {
115
		return interpretation;
116
	}
117

  
118
	/**
119
	 * Sets the interpretation.
120
	 * 
121
	 * @param interpretation
122
	 *            the interpretation to set
123
	 */
124
	@Required
125
	public void setInterpretation(final String interpretation) {
126
		this.interpretation = interpretation;
127
	}
128

  
129
	/**
130
	 * Gets the metadata format.
131
	 * 
132
	 * @return the metadataFormat
133
	 */
134
	public String getMetadataFormat() {
135
		return metadataFormat;
136
	}
137

  
138
	/**
139
	 * Sets the metadata format.
140
	 * 
141
	 * @param metadataFormat
142
	 *            the metadataFormat to set
143
	 */
144
	@Required
145
	public void setMetadataFormat(final String metadataFormat) {
146
		this.metadataFormat = metadataFormat;
147
	}
148

  
149
	/**
150
	 * Gets the provider id.
151
	 * 
152
	 * @return the providerId
153
	 */
154
	public String getProviderId() {
155
		return providerId;
156
	}
157

  
158
	/**
159
	 * Sets the provider id.
160
	 * 
161
	 * @param providerId
162
	 *            the providerId to set
163
	 */
164
	public void setProviderId(final String providerId) {
165
		this.providerId = providerId;
166
	}
167

  
168
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/objectStore/UpdateObjectStoreSizeJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

  
3
import javax.annotation.Resource;
4

  
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7

  
8
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
9
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
10
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
11
import eu.dnetlib.miscutils.datetime.DateUtils;
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13

  
14
public class UpdateObjectStoreSizeJobNode extends SimpleJobNode {
15

  
16
	/** The obejct store id. */
17
	private String obejctStoreIdParam;
18

  
19
	/** The service locator. */
20
	@Resource
21
	private UniqueServiceLocator serviceLocator;
22

  
23
	@Override
24
	protected String execute(final NodeToken token) throws Exception {
25

  
26
		final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class);
27

  
28
		int size = serviceLocator.getService(ObjectStoreService.class, obejctStoreIdParam).getSize(obejctStoreIdParam);
29

  
30
		String now = DateUtils.now_ISO8601();
31

  
32
		String mdstoreXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + obejctStoreIdParam + "']"
33
				+ "return update value $x//LAST_STORAGE_DATE with '" + now + "'";
34

  
35
		registry.executeXUpdate(mdstoreXUpdate);
36

  
37
		String mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + obejctStoreIdParam + "']"
38
				+ "return update value $x//COUNT_STORE with '" + size + "'";
39

  
40
		registry.executeXUpdate(mdstoreNumberXUpdate);
41

  
42
		mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + obejctStoreIdParam + "']"
43
				+ "return update value $x//STORE_SIZE with '" + size + "'";
44

  
45
		registry.executeXUpdate(mdstoreNumberXUpdate);
46

  
47
		return Arc.DEFAULT_ARC;
48
	}
49

  
50
	/**
51
	 * @return the obejctStoreIdParam
52
	 */
53
	public String getObejctStoreIdParam() {
54
		return obejctStoreIdParam;
55
	}
56

  
57
	/**
58
	 * @param obejctStoreIdParam
59
	 *            the obejctStoreIdParam to set
60
	 */
61
	public void setObejctStoreIdParam(final String obejctStoreIdParam) {
62
		this.obejctStoreIdParam = obejctStoreIdParam;
63
	}
64

  
65
}
modules/dnet-msro-service/tags/dnet-msro-service-2.0.3/src/main/java/eu/dnetlib/msro/workflows/nodes/objectStore/DownloadIntoObjectStoreJobNode.java
1
package eu.dnetlib.msro.workflows.nodes.objectStore;
2

  
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Map;
6

  
7
import javax.xml.ws.wsaddressing.W3CEndpointReference;
8
import javax.xml.xpath.XPath;
9
import javax.xml.xpath.XPathFactory;
10

  
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.xml.sax.InputSource;
14

  
15
import com.googlecode.sarasvati.Engine;
16
import com.googlecode.sarasvati.NodeToken;
17
import com.googlecode.sarasvati.env.Env;
18

  
19
import eu.dnetlib.data.objectstore.rmi.MetadataObjectRecord;
20
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
21
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
22
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
23
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
24
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
25
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
26
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
27
import eu.dnetlib.msro.workflows.util.ProgressProvider;
28
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
29
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
30

  
31
public class DownloadIntoObjectStoreJobNode extends BlackboardJobNode implements ProgressJobNode {
32

  
33
	private static final Log log = LogFactory.getLog(DownloadIntoObjectStoreJobNode.class);
34

  
35
	class MetadataObjectIterator implements Iterator<String> {
36

  
37
		private Iterator<String> inputIterator;
38

  
39
		private String mime;
40

  
41
		public MetadataObjectIterator(final Iterator<String> inputIterator, final String xpath, final String mime) {
42
			this.inputIterator = inputIterator;
43
		}
44

  
45
		@Override
46
		public boolean hasNext() {
47
			return inputIterator.hasNext();
48
		}
49

  
50
		@Override
51
		public String next() {
52
			try {
53
				String record = inputIterator.next();
54
				XPath xpath = XPathFactory.newInstance().newXPath();
55
				InputSource doc = new InputSource(new StringReader(record));
56
				String identifier = xpath.evaluate(getIdXpath(), doc);
57
				MetadataObjectRecord objectrecord = new MetadataObjectRecord(identifier, record, mime);
58
				return objectrecord.toJSON();
59
			} catch (Exception e) {
60
				return null;
61
			}
62
		}
63

  
64
		@Override
65
		public void remove() {
66

  
67
		}
68

  
69
	}
70

  
71
	private String eprParam;
72

  
73
	private String objectStoreId;
74

  
75
	private String idXpath; // "//*[local-name()='objIdentifier']
76

  
77
	private String contentDescription;
78

  
79
	private String objectIsInsideEpr;
80

  
81
	private IterableResultSetFactory iterableResultSetFactory;
82
	private ResultSetClientFactory resultSetClientFactory;
83

  
84
	private ResultsetProgressProvider progressProvider;
85
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
86

  
87
	public String getEprParam() {
88
		return eprParam;
89
	}
90

  
91
	public void setEprParam(final String eprParam) {
92
		this.eprParam = eprParam;
93
	}
94

  
95
	public String getObjectStoreId() {
96
		return objectStoreId;
97
	}
98

  
99
	public void setObjectStoreId(final String objectStoreId) {
100
		this.objectStoreId = objectStoreId;
101
	}
102

  
103
	public ProgressProvider getProgressProvider(final NodeToken token) {
104

  
105
		return progressProvider;
106
	}
107

  
108
	@Override
109
	protected String getXqueryForServiceId(final NodeToken token) {
110
		return "//RESOURCE_IDENTIFIER[../RESOURCE_TYPE/@value='ObjectStoreServiceResourceType']/@value/string()";
111
	}
112

  
113
	@Override
114
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
115

  
116
		job.setAction("FEEDOBJECT");
117
		final String eprS = token.getEnv().getAttribute(getEprParam());
118
		job.getParameters().put("obsID", getObjectStoreId());
119
		job.getParameters().put("mime", getContentDescription());
120
		final Iterator<String> client = resultSetClientFactory.getClient(eprS).iterator();
121

  
122
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() {
123

  
124
			@Override
125
			public Iterator<String> iterator() {
126
				return new MetadataObjectIterator(client, "//*[local-name()='objIdentifier']", "xml");
127
			}
128
		});
129
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
130
		job.getParameters().put("epr", progressProvider.getEpr().toString());
131

  
132
	}
133

  
134
	@Override
135
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
136
		return new BlackboardWorkflowJobListener(engine, token) {
137

  
138
			@Override
139
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
140
				log.info("Number of stored records: " + responseParams.get("total"));
141
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
142
			}
143
		};
144
	}
145

  
146
	public String getObjectIsInsideEpr() {
147
		return objectIsInsideEpr;
148
	}
149

  
150
	public void setObjectIsInsideEpr(final String objectIsInsideEpr) {
151
		this.objectIsInsideEpr = objectIsInsideEpr;
152
	}
153

  
154
	@Override
155
	public ResultsetProgressProvider getProgressProvider() {
156
		return progressProvider;
157
	}
158

  
159
	public void setProgressProvider(final ResultsetProgressProvider progressProvider) {
160
		this.progressProvider = progressProvider;
161
	}
162

  
163
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
164
		return processCountingResultSetFactory;
165
	}
166

  
167
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
168
		this.processCountingResultSetFactory = processCountingResultSetFactory;
169
	}
170

  
171
	public ResultSetClientFactory getResultSetClientFactory() {
172
		return resultSetClientFactory;
173
	}
174

  
175
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
176
		this.resultSetClientFactory = resultSetClientFactory;
177
	}
178

  
179
	public IterableResultSetFactory getIterableResultSetFactory() {
180
		return iterableResultSetFactory;
181
	}
182

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff