Project

General

Profile

« Previous | Next » 

Revision 36494

Partial wf nodes reimplementation

View differences:

modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/AsyncJobNode.java
1
package eu.dnetlib.msro.worker.nodes;
2

  
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11

  
12
import eu.dnetlib.msro.worker.WorkflowConstants;
13

  
14
public abstract class AsyncJobNode extends SarasvatiJobNode {
15

  
16
	/**
17
	 * logger.
18
	 */
19
	private static final Log log = LogFactory.getLog(AsyncJobNode.class);
20

  
21
	private final ExecutorService executor = Executors.newCachedThreadPool();
22

  
23
	@Override
24
	public void execute(final Engine engine, final NodeToken token) {
25
		super.execute(engine, token);
26

  
27
		log.info("executing async node");
28

  
29
		executor.execute(new Runnable() {
30

  
31
			@Override
32
			public void run() {
33
				try {
34
					log.debug("START NODE: " + getBeanName());
35
					beforeStart(token);
36
					String arc = execute(token);
37
					beforeCompleted(token);
38
					log.debug("END NODE (SUCCESS): " + getBeanName());
39
					engine.complete(token, arc);
40
				} catch (Throwable e) {
41
					log.error("got exception while executing workflow node", e);
42
					log.debug("END NODE (FAILED): " + getBeanName());
43
					beforeFailed(token);
44
					token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
45
					token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, e.getMessage());
46
					engine.complete(token, "failed");
47
				}
48
			}
49
		});
50
	}
51

  
52
	abstract protected String execute(final NodeToken token) throws Exception;
53

  
54
	protected void beforeStart(final NodeToken token) {}
55

  
56
	protected void beforeCompleted(final NodeToken token) {}
57

  
58
	protected void beforeFailed(final NodeToken token) {}
59
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/unpack/UnpackJobNode.java
1
package eu.dnetlib.msro.worker.nodes.unpack;
2

  
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Queue;
6
import java.util.concurrent.PriorityBlockingQueue;
7

  
8
import javax.annotation.Resource;
9

  
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.dom4j.Document;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15

  
16
import com.googlecode.sarasvati.Arc;
17
import com.googlecode.sarasvati.NodeToken;
18

  
19
import eu.dnetlib.data.resultSet.ResultSetFactory;
20
import eu.dnetlib.data.resultSet.client.ResultSetClientFactory;
21
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
22
import eu.dnetlib.rmi.objects.resultSet.ResultSet;
23

  
24
public class UnpackJobNode extends SimpleJobNode {
25

  
26
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(UnpackJobNode.class);
30

  
31
	@Resource
32
	private ResultSetClientFactory resultSetClientFactory;
33
	@Resource
34
	private ResultSetFactory resultSetFactory;
35

  
36
	private String inputResultSetParam;
37
	private String outputResultSetParam;
38
	private String xpath;
39

  
40
	@Override
41
	protected String execute(final NodeToken token) throws Exception {
42
		@SuppressWarnings("unchecked")
43
		final ResultSet<String> rsInput = (ResultSet<String>) token.getEnv().getTransientAttribute(inputResultSetParam);
44

  
45
		final Iterator<String> client = resultSetClientFactory.getClient(rsInput, String.class, 100).iterator();
46
		final Queue<String> queue = new PriorityBlockingQueue<String>();
47

  
48
		if (client.hasNext()) {
49
			populateQueue(queue, client.next(), xpath);
50
		}
51

  
52
		final ResultSet<String> rsOutput = resultSetFactory.createResultSet(new Iterable<String>() {
53

  
54
			@Override
55
			public Iterator<String> iterator() {
56
				return new Iterator<String>() {
57

  
58
					@Override
59
					public boolean hasNext() {
60
						synchronized (queue) {
61
							return !queue.isEmpty();
62
						}
63
					}
64

  
65
					@Override
66
					public String next() {
67
						synchronized (queue) {
68
							final String res = queue.poll();
69
							while (queue.isEmpty() &&
70
									client.hasNext()) {
71
								populateQueue(queue, client.next(), xpath);
72
							}
73
							return res;
74
						}
75
					}
76

  
77
					@Override
78
					public void remove() {}
79
				};
80
			}
81
		});
82

  
83
		token.getEnv().setTransientAttribute(outputResultSetParam, rsOutput);
84

  
85
		return Arc.DEFAULT_ARC;
86
	}
87

  
88
	private void populateQueue(final Queue<String> queue, final String record, final String xpath) {
89
		try {
90
			final SAXReader reader = new SAXReader();
91
			final Document doc = reader.read(new StringReader(record));
92
			for (Object o : doc.selectNodes(xpath)) {
93
				queue.add(((Node) o).asXML());
94
			}
95
		} catch (Exception e) {
96
			log.error("Error unpacking record: \n" + record, e);
97
			throw new RuntimeException(e);
98
		}
99
	}
100

  
101
	public String getXpath() {
102
		return xpath;
103
	}
104

  
105
	public void setXpath(final String xpath) {
106
		this.xpath = xpath;
107
	}
108

  
109
	public String getInputResultSetParam() {
110
		return inputResultSetParam;
111
	}
112

  
113
	public void setInputResultSetParam(final String inputResultSetParam) {
114
		this.inputResultSetParam = inputResultSetParam;
115
	}
116

  
117
	public String getOutputResultSetParam() {
118
		return outputResultSetParam;
119
	}
120

  
121
	public void setOutputResultSetParam(final String outputResultSetParam) {
122
		this.outputResultSetParam = outputResultSetParam;
123
	}
124

  
125
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/BlackboardJobNode.java
1
package eu.dnetlib.msro.worker.nodes;
2

  
3
import javax.annotation.Resource;
4

  
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13

  
14
import eu.dnetlib.common.services.locators.DnetServiceLocator;
15
import eu.dnetlib.enabling.blackboard.BlackboardCallback;
16
import eu.dnetlib.enabling.blackboard.BlackboardDispatcher;
17
import eu.dnetlib.msro.worker.WorkflowConstants;
18
import eu.dnetlib.rmi.blackboard.AbstractBlackboardMessage;
19

  
20
public abstract class BlackboardJobNode<T> extends SarasvatiJobNode implements BlackboardCallback<T> {
21

  
22
	/**
23
	 * logger.
24
	 */
25
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class);
26

  
27
	@Resource
28
	private DnetServiceLocator serviceLocator;
29

  
30
	@Resource
31
	private BlackboardDispatcher blackboardDispatcher;
32

  
33
	private Engine engine;
34

  
35
	private NodeToken token;
36

  
37
	@Override
38
	public void execute(final Engine engine, final NodeToken token) {
39
		super.execute(engine, token);
40

  
41
		this.engine = engine;
42
		this.token = token;
43

  
44
		log.info("executing blackboard node");
45

  
46
		try {
47
			token.getEnv().setAttribute(WorkflowConstants.BLACKBOARD_IS_BLACKBOARD, true);
48

  
49
			final String serviceId = obtainServiceId(token);
50
			if (StringUtils.isBlank(serviceId)) {
51
				token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
52
				final String msg = "cannot locate target service profile: " + serviceId;
53
				token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, msg);
54
				log.error(msg);
55
				engine.complete(token, "failed");
56
				return;
57
			}
58
			blackboardDispatcher.createDispatcher(serviceId, getMessage(), this);
59
		} catch (final Throwable e) {
60
			token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
61
			token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
62
			engine.complete(token, "failed");
63
			log.error("cannot prepare blackboard job", e);
64
		}
65
	}
66

  
67
	abstract protected T getMessage();
68

  
69
	abstract protected String obtainServiceId(NodeToken token);
70

  
71
	@Override
72
	public void onSuccess(final T t) {
73
		log.debug("Blackboard workflow node DONE");
74
		saveResponseInEnv(t, token.getEnv());
75
		engine.complete(token, Arc.DEFAULT_ARC);
76
		engine.executeQueuedArcTokens(token.getProcess());
77
	}
78

  
79
	abstract protected void saveResponseInEnv(final T t, final Env env);
80

  
81
	@Override
82
	public void onFail(final T t) {
83
		final String error = t instanceof AbstractBlackboardMessage ? ((AbstractBlackboardMessage) t).getError() : "Unknown Error";
84
		log.warn("Blackboard workflow node FAILED: " + error);
85
		saveResponseInEnv(t, token.getEnv());
86
		token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
87
		token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, error);
88
		engine.complete(token, "failed");
89
	}
90

  
91
	@Override
92
	public void onOngoing(final T t) {
93
		getToken().getEnv().setAttribute(WorkflowConstants.BLACKBOARD_IS_GOING, true);
94
	}
95

  
96
	public Engine getEngine() {
97
		return engine;
98
	}
99

  
100
	public void setEngine(final Engine engine) {
101
		this.engine = engine;
102
	}
103

  
104
	public NodeToken getToken() {
105
		return token;
106
	}
107

  
108
	public void setToken(final NodeToken token) {
109
		this.token = token;
110
	}
111

  
112
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/sel/SelectiveNode.java
1
package eu.dnetlib.msro.worker.nodes.sel;
2

  
3
import com.googlecode.sarasvati.Arc;
4
import com.googlecode.sarasvati.NodeToken;
5

  
6
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
7

  
8
/**
9
 * The Class SelectiveNode allows to decide which path a workflow must take.
10
 */
11
public class SelectiveNode extends SimpleJobNode {
12

  
13
	/** The selection. */
14
	private String selection = Arc.DEFAULT_ARC;
15

  
16
	/*
17
	 * (non-Javadoc)
18
	 * 
19
	 * @see eu.dnetlib.msro.worker.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
20
	 */
21
	@Override
22
	protected String execute(final NodeToken token) throws Exception {
23
		return selection;
24
	}
25

  
26
	/**
27
	 * Gets the selection.
28
	 *
29
	 * @return the selection
30
	 */
31
	public String getSelection() {
32
		return selection;
33
	}
34

  
35
	/**
36
	 * Sets the selection.
37
	 *
38
	 * @param selection
39
	 *            the new selection
40
	 */
41
	public void setSelection(final String selection) {
42
		this.selection = selection;
43
	}
44

  
45
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/objectStore/RetrieveMdStoreId.java
1
package eu.dnetlib.msro.worker.nodes.objectStore;
2

  
3
import java.util.List;
4
import java.util.Set;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.springframework.beans.factory.annotation.Required;
9

  
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import com.google.gson.Gson;
13
import com.google.gson.GsonBuilder;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

  
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
19

  
20
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
21

  
22
/**
23
 * The Class RetrieveMdStoreId is a job node used to retrieve the correct MDStore from which extract the url of the file to download.
24
 * metadata format and interpretation are injected as properties
25
 */
26
public class RetrieveMdStoreId extends SimpleJobNode {
27

  
28
	/** The metadata format. */
29
	private String metadataFormat;
30

  
31
	/** The interpretation. */
32
	private String interpretation;
33

  
34
	/** The provider id. */
35
	private String providerId;
36

  
37
	/** The service locator. */
38
	@Resource
39
	private DnetServiceLocator serviceLocator;
40

  
41
	/*
42
	 * (non-Javadoc)
43
	 *
44
	 * @see eu.dnetlib.msro.worker.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
45
	 */
46
	@Override
47
	protected String execute(final NodeToken token) throws Exception {
48

  
49
		String workflowQuery =
50
				"for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') where($x//DATAPROVIDER/@id='%s') return  distinct-values($x//WORKFLOW/@id/string())";
51

  
52
		List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(workflowQuery, providerId));
53
		if (result.size() == 0) { throw new RuntimeException("there is no mdStore Associated to the provider " + token.getEnv().getAttribute(getProviderId())); }
54
		Set<String> workflowIds = Sets.newHashSet(result);
55

  
56
		Set<String> metadataIds = getMdStores(workflowIds);
57
		Gson g = new GsonBuilder().disableHtmlEscaping().create();
58
		token.getEnv().setAttribute("mdId", g.toJson(metadataIds));
59

  
60
		token.getEnv().setAttribute("mdFormat", getMetadataFormat());
61
		return Arc.DEFAULT_ARC;
62
	}
63

  
64
	private Set<String> getMdStores(final Set<String> workflowsId) {
65
		try {
66

  
67
			String query = "//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']//PARAM[./@category/string()='MDSTORE_ID']/text()";
68

  
69
			Set<String> mdStores = Sets.newHashSet();
70

  
71
			if (workflowsId == null) { return null; }
72

  
73
			for (String workflowId : workflowsId) {
74
				List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, workflowId));
75
				Set<String> metadataIds = Sets.newHashSet(result);
76
				mdStores.addAll(getRightMetadataId(Lists.newArrayList(metadataIds)));
77
			}
78
			return mdStores;
79

  
80
		} catch (ISLookUpException e) {
81

  
82
			return null;
83
		}
84
	}
85

  
86
	/**
87
	 * Gets the right metadata id whith the format metadataFormat and interpretation interpretation
88
	 *
89
	 * @return the right metadata id
90
	 * @throws ISLookUpException
91
	 */
92
	private Set<String> getRightMetadataId(final Iterable<String> ids) throws ISLookUpException {
93
		String query =
94
				"let $x:=//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s'] return concat($x//METADATA_FORMAT/text(), '::<<>>::', $x//METADATA_FORMAT_INTERPRETATION/text())";
95
		Set<String> result = Sets.newHashSet();
96

  
97
		for (String id : ids) {
98

  
99
			List<String> results = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, id));
100
			if (results.size() > 0) {
101
				String[] values = results.get(0).split("::<<>>::");
102
				if (metadataFormat.equals(values[0]) && interpretation.equals(values[1])) {
103
					result.add(id);
104
				}
105
			}
106
		}
107
		return result;
108

  
109
	}
110

  
111
	/**
112
	 * Gets the interpretation.
113
	 *
114
	 * @return the interpretation
115
	 */
116
	public String getInterpretation() {
117
		return interpretation;
118
	}
119

  
120
	/**
121
	 * Sets the interpretation.
122
	 *
123
	 * @param interpretation
124
	 *            the interpretation to set
125
	 */
126
	@Required
127
	public void setInterpretation(final String interpretation) {
128
		this.interpretation = interpretation;
129
	}
130

  
131
	/**
132
	 * Gets the metadata format.
133
	 *
134
	 * @return the metadataFormat
135
	 */
136
	public String getMetadataFormat() {
137
		return metadataFormat;
138
	}
139

  
140
	/**
141
	 * Sets the metadata format.
142
	 *
143
	 * @param metadataFormat
144
	 *            the metadataFormat to set
145
	 */
146
	@Required
147
	public void setMetadataFormat(final String metadataFormat) {
148
		this.metadataFormat = metadataFormat;
149
	}
150

  
151
	/**
152
	 * Gets the provider id.
153
	 *
154
	 * @return the providerId
155
	 */
156
	public String getProviderId() {
157
		return providerId;
158
	}
159

  
160
	/**
161
	 * Sets the provider id.
162
	 *
163
	 * @param providerId
164
	 *            the providerId to set
165
	 */
166
	public void setProviderId(final String providerId) {
167
		this.providerId = providerId;
168
	}
169

  
170
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/objectStore/UpdateObjectStoreSizeJobNode.java
1
package eu.dnetlib.msro.worker.nodes.objectStore;
2

  
3
import javax.annotation.Resource;
4

  
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7

  
8
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService;
9
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
10

  
11
import eu.dnetlib.miscutils.datetime.DateUtils;
12
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
13

  
14
// TODO: Auto-generated Javadoc
15
/**
16
 * The Class UpdateObjectStoreSizeJobNode.
17
 */
18
public class UpdateObjectStoreSizeJobNode extends SimpleJobNode {
19

  
20
	/** The object store id. */
21
	private String objectStoreIdParam;
22

  
23
	/** The service locator. */
24
	@Resource
25
	private DnetServiceLocator serviceLocator;
26

  
27
	/*
28
	 * (non-Javadoc)
29
	 * 
30
	 * @see eu.dnetlib.msro.worker.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
31
	 */
32
	@Override
33
	protected String execute(final NodeToken token) throws Exception {
34

  
35
		final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class);
36

  
37
		int size = serviceLocator.getService(ObjectStoreService.class, objectStoreIdParam).getSize(objectStoreIdParam);
38

  
39
		String now = DateUtils.now_ISO8601();
40

  
41
		String mdstoreXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + objectStoreIdParam + "']"
42
				+ "return update value $x//LAST_STORAGE_DATE with '" + now + "'";
43

  
44
		registry.executeXUpdate(mdstoreXUpdate);
45

  
46
		String mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + objectStoreIdParam + "']"
47
				+ "return update value $x//COUNT_STORE with '" + size + "'";
48

  
49
		registry.executeXUpdate(mdstoreNumberXUpdate);
50

  
51
		mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + objectStoreIdParam + "']"
52
				+ "return update value $x//STORE_SIZE with '" + size + "'";
53

  
54
		registry.executeXUpdate(mdstoreNumberXUpdate);
55

  
56
		return Arc.DEFAULT_ARC;
57
	}
58

  
59
	/**
60
	 * Gets the object store id param.
61
	 *
62
	 * @return the objectStoreIdParam
63
	 */
64
	public String getObjectStoreIdParam() {
65
		return objectStoreIdParam;
66
	}
67

  
68
	/**
69
	 * Sets the object store id param.
70
	 *
71
	 * @param objectStoreIdParam
72
	 *            the new object store id param
73
	 */
74
	public void setObjectStoreIdParam(final String objectStoreIdParam) {
75
		this.objectStoreIdParam = objectStoreIdParam;
76
	}
77

  
78
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/repobye/DeleteMetaWfJobNode.java
1
package eu.dnetlib.msro.worker.nodes.repobye;
2

  
3
import java.io.StringReader;
4
import java.io.StringWriter;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.dom4j.Document;
11
import org.dom4j.Node;
12
import org.dom4j.io.SAXReader;
13

  
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

  
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
19

  
20
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
21
import eu.dnetlib.msro.workflows.util.WorkflowConstants;
22

  
23
public class DeleteMetaWfJobNode extends SimpleJobNode {
24

  
25
	private String metaWfId;
26

  
27
	@Resource
28
	private DnetServiceLocator serviceLocator;
29

  
30
	private static final Log log = LogFactory.getLog(DeleteMetaWfJobNode.class);
31

  
32
	@Override
33
	protected String execute(final NodeToken token) throws Exception {
34
		final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfile(metaWfId);
35
		final Document doc = new SAXReader().read(new StringReader(profile));
36

  
37
		final String dsId = doc.valueOf("//DATAPROVIDER/@id");
38
		final String dsName = doc.valueOf("//DATAPROVIDER/text()");
39
		final String ifaceId = doc.valueOf("//DATAPROVIDER/@interface");
40
		final String destroyWfId = doc.valueOf("//CONFIGURATION/@destroyWorkflow");
41

  
42
		log.info("Removing a MetaWf of dataprovider: " + dsId);
43

  
44
		token.getEnv().setAttribute(WorkflowConstants.DATAPROVIDER_ID, dsId);
45
		token.getEnv().setAttribute(WorkflowConstants.DATAPROVIDER_NAME, dsName);
46
		token.getEnv().setAttribute(WorkflowConstants.DATAPROVIDER_INTERFACE, ifaceId);
47

  
48
		final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class);
49

  
50
		for (Object o : doc.selectNodes("//WORKFLOW")) {
51
			final String wfId = ((Node) o).valueOf("@id");
52
			try {
53
				registry.deleteProfile(wfId);
54
				log.info(" - Deleted Workflow: " + wfId);
55
			} catch (Exception e) {
56
				log.error(" - (ERR) Error deleting profile " + wfId);
57
			}
58
		}
59
		registry.deleteProfile(metaWfId);
60
		log.info(" - Deleted MetaWorkflow: " + metaWfId);
61

  
62
		registry.deleteProfile(destroyWfId);
63
		log.info(" - Deleted destroy workflow: " + destroyWfId);
64

  
65
		verifyDatasource(dsId, ifaceId);
66

  
67
		return Arc.DEFAULT_ARC;
68
	}
69

  
70
	private void verifyDatasource(final String dsId, final String ifaceId) throws Exception {
71
		final StringWriter sw = new StringWriter();
72

  
73
		sw.append("for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') where");
74
		sw.append("  $x//DATAPROVIDER/@id = '" + dsId + "' and ");
75
		sw.append("  $x//DATAPROVIDER/@interface = '" + ifaceId + "' and ");
76
		sw.append("  $x//RESOURCE_IDENTIFIER/@value != '" + metaWfId + "' ");
77
		sw.append("return $x//RESOURCE_IDENTIFIER/@value/string()");
78

  
79
		final boolean active = !serviceLocator.getService(ISLookUpService.class).quickSearchProfile(sw.toString()).isEmpty();
80

  
81
		log.info(" - Updating iface, active status: " + active);
82

  
83
		updateIfaceActivationStatus(dsId, ifaceId, active);
84
	}
85

  
86
	protected void updateIfaceActivationStatus(final String dsId, final String ifaceId, final boolean active) throws Exception {
87
		serviceLocator.getService(ISRegistryService.class).updateProfileNode(dsId, "//INTERFACE[@id = '" + ifaceId + "']/@active", "'" + active + "'");
88
	}
89

  
90
	public String getMetaWfId() {
91
		return metaWfId;
92
	}
93

  
94
	public void setMetaWfId(final String metaWfId) {
95
		this.metaWfId = metaWfId;
96
	}
97

  
98
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/SuccessFailureNode.java
1
package eu.dnetlib.msro.worker.nodes;
2

  
3
import java.util.Map;
4

  
5
import javax.annotation.Resource;
6

  
7
import org.antlr.stringtemplate.StringTemplate;
8
import org.apache.commons.io.IOUtils;
9
import org.apache.commons.lang.StringEscapeUtils;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.springframework.beans.factory.annotation.Required;
14

  
15
import com.google.common.collect.Maps;
16
import com.googlecode.sarasvati.Arc;
17
import com.googlecode.sarasvati.NodeToken;
18

  
19
import eu.dnetlib.common.services.locators.DnetServiceLocator;
20
import eu.dnetlib.miscutils.DateUtils;
21
import eu.dnetlib.msro.worker.WorkflowConstants;
22

  
23
/**
24
 * The success node sets the "isCompletedSuccessfully" env var.
25
 * 
26
 */
27
public class SuccessFailureNode extends SimpleJobNode {
28

  
29
	/**
30
	 * is completed successfully.
31
	 */
32
	private boolean success;
33

  
34
	@Resource
35
	private DnetServiceLocator serviceLocator;
36

  
37
	private static final Log log = LogFactory.getLog(SuccessFailureNode.class);
38

  
39
	@Override
40
	protected String execute(final NodeToken token) {
41
		final String profileId = token.getFullEnv().getAttribute(WorkflowConstants.SYSTEM_WF_PROFILE_ID).trim();
42

  
43
		final long now = DateUtils.now();
44
		final String date = DateUtils.getDate_ISO8601(now);
45

  
46
		token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_END_DATE, now);
47
		token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_END_HUMAN_DATE, date);
48

  
49
		final Map<String, String> params = mergeEnvAttributes(token);
50
		try {
51
			final String template = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/msro/workflows/templates/workflow_status.xml.st"));
52
			final StringTemplate st = new StringTemplate(template);
53
			st.setAttribute("procId", StringEscapeUtils.escapeXml(params.get(WorkflowConstants.SYSTEM_WF_PROCESS_ID)));
54
			st.setAttribute("date", StringEscapeUtils.escapeXml(date));
55
			st.setAttribute("params", filterOutputParams(params));
56
			if (!isSuccess()) {
57
				st.setAttribute("error", StringEscapeUtils.escapeXml(params.get(WorkflowConstants.SYSTEM_ERROR)));
58
			}
59

  
60
			token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_COMPLETED_SUCCESSFULLY, isSuccess());
61
		} catch (Exception e) {
62
			log.error("Error updating workflow profile: " + profileId, e);
63
			token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_COMPLETED_SUCCESSFULLY, false);
64
		}
65

  
66
		return Arc.DEFAULT_ARC;
67
	}
68

  
69
	private Map<String, String> filterOutputParams(final Map<String, String> map) {
70
		final Map<String, String> res = Maps.newHashMap();
71

  
72
		if (map != null) {
73
			for (String k : map.keySet()) {
74
				if (!StringUtils.isBlank(k) && (k.startsWith(WorkflowConstants.DATAPROVIDER_PREFIX) || k.startsWith(WorkflowConstants.MAIN_LOG_PREFIX))) {
75
					final String key = StringEscapeUtils.escapeXml(k);
76
					final String v = map.get(k);
77
					res.put(key, v != null ? StringEscapeUtils.escapeXml(v) : "null");
78
				}
79
			}
80
		}
81

  
82
		return res;
83
	}
84

  
85
	private Map<String, String> mergeEnvAttributes(final NodeToken token) {
86
		final Map<String, String> map = Maps.newHashMap();
87

  
88
		for (String s : token.getEnv().getAttributeNames()) {
89
			map.put(s, token.getEnv().getAttribute(s));
90
		}
91
		for (String s : token.getFullEnv().getAttributeNames()) {
92
			map.put(s, token.getFullEnv().getAttribute(s));
93
		}
94
		return map;
95
	}
96

  
97
	public boolean isSuccess() {
98
		return success;
99
	}
100

  
101
	@Required
102
	public void setSuccess(final boolean success) {
103
		this.success = success;
104
	}
105

  
106
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/index/PrepareCreateIndexJobNode.java
1
package eu.dnetlib.msro.worker.nodes.index;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.NodeToken;
7

  
8
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
9

  
10
public class PrepareCreateIndexJobNode extends SimpleJobNode {
11

  
12
	private static final Log log = LogFactory.getLog(PrepareCreateIndexJobNode.class);
13

  
14
	private String layout;
15
	private String format;
16
	private String interpretation;
17

  
18
	@Override
19
	protected String execute(final NodeToken token) throws Exception {
20
		log.info("Preparing env for CreateIndexJobNode");
21
		token.getEnv().setAttribute("layout", layout);
22
		token.getEnv().setAttribute("format", format);
23
		token.getEnv().setAttribute("interpretation", interpretation);
24
		return null;
25
	}
26

  
27
	public String getLayout() {
28
		return layout;
29
	}
30

  
31
	public void setLayout(final String layout) {
32
		this.layout = layout;
33
	}
34

  
35
	public String getFormat() {
36
		return format;
37
	}
38

  
39
	public void setFormat(final String format) {
40
		this.format = format;
41
	}
42

  
43
	public String getInterpretation() {
44
		return interpretation;
45
	}
46

  
47
	public void setInterpretation(final String interpretation) {
48
		this.interpretation = interpretation;
49
	}
50

  
51
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/download/DownloadFromMetadataJobNode.java
1
package eu.dnetlib.msro.worker.nodes.download;
2

  
3
import java.util.Map;
4

  
5
import org.springframework.beans.factory.annotation.Autowired;
6

  
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9
import com.googlecode.sarasvati.env.Env;
10

  
11
import eu.dnetlib.data.download.rmi.DownloadService;
12
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
13
import eu.dnetlib.msro.worker.nodes.BlackboardJobNode;
14
import eu.dnetlib.msro.worker.nodes.ProgressJobNode;
15
import eu.dnetlib.msro.worker.nodes.blackboard.BlackboardWorkflowJobListener;
16
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
17
import eu.dnetlib.msro.workflows.util.ProgressProvider;
18
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
19
import eu.dnetlib.msro.workflows.util.WorkflowConstants;
20

  
21
// TODO: Auto-generated Javadoc
22
/**
23
 * The Class DownloadFromMetadata is a job node that send a blackboard message to the Download service to start to download file from url
24
 * retrieved by Metadata .
25
 */
26
public class DownloadFromMetadataJobNode extends BlackboardJobNode implements ProgressJobNode {
27

  
28
	/** The inputepr param. */
29
	private String inputeprParam;
30

  
31
	/** The obejct store id. */
32
	private String objectStoreID;
33

  
34
	/** The plugin. */
35
	private String plugin;
36

  
37
	/** The protocol. */
38
	private String protocol;
39

  
40
	/** The mime type. */
41
	private String mimeType;
42

  
43
	/** The process counting result set factory. */
44
	@Autowired
45
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
46

  
47
	/** The progress provider. */
48
	private ResultsetProgressProvider progressProvider;
49

  
50
	/*
51
	 * (non-Javadoc)
52
	 * 
53
	 * @see eu.dnetlib.msro.worker.nodes.BlackboardJobNode#obtainServiceId(com.googlecode.sarasvati.NodeToken)
54
	 */
55
	@Override
56
	protected String obtainServiceId(final NodeToken token) {
57
		return getServiceLocator().getServiceId(DownloadService.class);
58
	}
59

  
60
	/*
61
	 * (non-Javadoc)
62
	 * 
63
	 * @see eu.dnetlib.msro.worker.nodes.BlackboardJobNode#prepareJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
64
	 * com.googlecode.sarasvati.NodeToken)
65
	 */
66
	@Override
67
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
68
		job.setAction("DOWNLOAD");
69
		final String eprS = token.getEnv().getAttribute(getInputeprParam());
70
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), eprS);
71
		job.getParameters().put("epr", progressProvider.getEpr().toString());
72
		job.getParameters().put("protocol", getProtocol());
73
		job.getParameters().put("plugin", getPlugin());
74
		job.getParameters().put("mimeType", getMimeType());
75
		job.getParameters().put("objectStoreID", getObjectStoreID());
76

  
77
	}
78

  
79
	/*
80
	 * (non-Javadoc)
81
	 * 
82
	 * @see eu.dnetlib.msro.worker.nodes.BlackboardJobNode#generateBlackboardListener(com.googlecode.sarasvati.Engine,
83
	 * com.googlecode.sarasvati.NodeToken)
84
	 */
85
	@Override
86
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
87
		return new BlackboardWorkflowJobListener(engine, token) {
88

  
89
			@Override
90
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
91
				env.setAttribute(WorkflowConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
92
			}
93
		};
94
	}
95

  
96
	/**
97
	 * Gets the inputepr param.
98
	 *
99
	 * @return the inputeprParam
100
	 */
101
	public String getInputeprParam() {
102
		return inputeprParam;
103
	}
104

  
105
	/**
106
	 * Gets the object store id.
107
	 *
108
	 * @return the objectStoreID
109
	 */
110
	public String getObjectStoreID() {
111
		return objectStoreID;
112
	}
113

  
114
	/**
115
	 * Sets the object store id.
116
	 *
117
	 * @param objectStoreID
118
	 *            the objectStoreID to set
119
	 */
120
	public void setObjectStoreID(final String objectStoreID) {
121
		this.objectStoreID = objectStoreID;
122
	}
123

  
124
	/**
125
	 * Gets the plugin.
126
	 *
127
	 * @return the plugin
128
	 */
129
	public String getPlugin() {
130
		return plugin;
131
	}
132

  
133
	/**
134
	 * Sets the plugin.
135
	 *
136
	 * @param plugin
137
	 *            the plugin to set
138
	 */
139
	public void setPlugin(final String plugin) {
140
		this.plugin = plugin;
141
	}
142

  
143
	/**
144
	 * Gets the protocol.
145
	 *
146
	 * @return the protol
147
	 */
148
	public String getProtocol() {
149
		return protocol;
150
	}
151

  
152
	/**
153
	 * Sets the protocol.
154
	 *
155
	 * @param protol
156
	 *            the protol to set
157
	 */
158
	public void setProtocol(final String protol) {
159
		this.protocol = protol;
160
	}
161

  
162
	/**
163
	 * Gets the mime type.
164
	 *
165
	 * @return the mimeType
166
	 */
167
	public String getMimeType() {
168
		return mimeType;
169
	}
170

  
171
	/**
172
	 * Sets the mime type.
173
	 *
174
	 * @param mimeType
175
	 *            the mimeType to set
176
	 */
177
	public void setMimeType(final String mimeType) {
178
		this.mimeType = mimeType;
179
	}
180

  
181
	/**
182
	 * Sets the inputepr param.
183
	 *
184
	 * @param inputeprParam
185
	 *            the inputeprParam to set
186
	 */
187
	public void setInputeprParam(final String inputeprParam) {
188
		this.inputeprParam = inputeprParam;
189
	}
190

  
191
	/*
192
	 * (non-Javadoc)
193
	 * 
194
	 * @see eu.dnetlib.msro.worker.nodes.ProgressJobNode#getProgressProvider()
195
	 */
196
	@Override
197
	public ProgressProvider getProgressProvider() {
198
		return progressProvider;
199
	}
200

  
201
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/SimpleJobNode.java
1
package eu.dnetlib.msro.worker.nodes;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.Engine;
7
import com.googlecode.sarasvati.NodeToken;
8

  
9
import eu.dnetlib.msro.worker.WorkflowConstants;
10

  
11
public abstract class SimpleJobNode extends SarasvatiJobNode {
12

  
13
	private static final Log log = LogFactory.getLog(SarasvatiJobNode.class);
14

  
15
	@Override
16
	public final void execute(final Engine engine, final NodeToken token) {
17
		super.execute(engine, token);
18

  
19
		try {
20
			log.debug("START NODE: " + getBeanName());
21
			beforeStart(token);
22
			String arc = execute(token);
23
			beforeCompleted(token);
24
			log.debug("END NODE (SUCCESS): " + getBeanName());
25

  
26
			engine.complete(token, arc);
27
		} catch (Throwable e) {
28
			log.error("got exception while executing workflow node", e);
29
			log.debug("END NODE (FAILED): " + getBeanName());
30
			beforeFailed(token);
31
			token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
32
			token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, e.getMessage());
33
			engine.complete(token, "failed");
34
		}
35
	}
36

  
37
	abstract protected String execute(final NodeToken token) throws Exception;
38

  
39
	protected void beforeStart(final NodeToken token) {
40
		// For optional overwrites
41
	}
42

  
43
	protected void beforeCompleted(final NodeToken token) {
44
		// For optional overwrites
45
	}
46

  
47
	protected void beforeFailed(final NodeToken token) {
48
		// For optional overwrites
49
	}
50

  
51
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/transform/GroovyJobNode.java
1
package eu.dnetlib.msro.worker.nodes.transform;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import javax.annotation.Resource;
7
import javax.xml.ws.wsaddressing.W3CEndpointReference;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Required;
12

  
13
import com.google.common.collect.Maps;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

  
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18

  
19
import eu.dnetlib.enabling.resultset.MappedResultSetFactory;
20
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
21
import eu.dnetlib.msro.rmi.ManagerServiceException;
22
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
23
import groovy.lang.GroovyShell;
24
import groovy.util.GroovyScriptEngine;
25

  
26
public class GroovyJobNode extends SimpleJobNode {
27

  
28
	private static final Log log = LogFactory.getLog(GroovyJobNode.class);
29

  
30
	/**
31
	 * used to transform the records using Groovy.
32
	 */
33

  
34
	private MappedResultSetFactory mappedResultSetFactory;
35

  
36
	private String inputEprParam;
37
	private String outputEprParam;
38
	private String transformationRuleId;
39
	// private String groovyParams;
40

  
41
	@Resource
42
	private DnetServiceLocator serviceLocator;
43

  
44
	private Map<String, String> retrieveGroovyParameter() {
45
		Map<String, String> out = Maps.newHashMap();
46

  
47
		String query = "for $x in collection('/db/DRIVER/GroovyProcessingDSResource/GroovyProcessingDSResourceType')"
48
				+ "where $x[.//RESOURCE_IDENTIFIER/@value='" + transformationRuleId + "']"
49
				+ "return concat($x//GROOVY_CLASSPATH/text(),':::',$x//GROOVY_DNETCLASS/text())";
50
		try {
51
			String result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query).get(0);
52
			if (result == null) { return null; }
53
			String[] data = result.trim().split(":::");
54
			if (data.length == 2) {
55
				out.put("classpath", data[0]);
56
				out.put("mainClass", data[1]);
57
			}
58

  
59
			return out;
60
		} catch (Exception e) {
61
			log.error(e);
62
			return null;
63
		}
64
	}
65

  
66
	@Override
67
	protected String execute(final NodeToken token) throws Exception {
68
		final String inputEprString = token.getEnv().getAttribute(inputEprParam);
69
		if (inputEprString == null || inputEprString.isEmpty()) { throw new ManagerServiceException("InputEprParam (" + inputEprParam + ") not found in ENV"); }
70
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(inputEprString);
71
		String groovyClasspath, groovyDnetClass;
72
		Map<String, String> prop = retrieveGroovyParameter();
73
		groovyClasspath = prop.get("classpath");
74
		groovyDnetClass = prop.get("mainClass");
75
		W3CEndpointReference epr = transformGroovy(inputEpr, groovyClasspath, groovyDnetClass, parseJsonParameters(token));
76
		token.getEnv().setAttribute(outputEprParam, epr.toString());
77
		return Arc.DEFAULT_ARC;
78
	}
79

  
80
	private W3CEndpointReference transformGroovy(final W3CEndpointReference source,
81
			final String groovyClasspath,
82
			final String groovyDnetClass,
83
			final Map<String, String> params) throws ClassNotFoundException, IOException {
84

  
85
		GroovyScriptEngine gse = new GroovyScriptEngine(groovyClasspath);
86
		gse.getGroovyClassLoader().loadClass(groovyDnetClass);
87
		log.info("***********************************************");
88
		log.info("Loaded Groovy classes:");
89
		for (Class<?> c : gse.getGroovyClassLoader().getLoadedClasses()) {
90
			log.info(c.getCanonicalName());
91
		}
92
		log.info("***********************************************");
93
		GroovyShell groovyShell = new GroovyShell(gse.getGroovyClassLoader());
94

  
95
		Object go = groovyShell.evaluate("new " + groovyDnetClass + "()");
96
		if (go instanceof GroovyUnaryFunction) {
97
			GroovyUnaryFunction groovyUnaryFunction = (GroovyUnaryFunction) go;
98
			if (params != null) {
99
				groovyUnaryFunction.setParams(params);
100
			}
101
			return mappedResultSetFactory.createMappedResultSet(source, groovyUnaryFunction);
102
		} else {
103
			throw new RuntimeException("Groovy object " + go + " is not supported");
104
		}
105
	}
106

  
107
	public MappedResultSetFactory getMappedResultSetFactory() {
108
		return mappedResultSetFactory;
109
	}
110

  
111
	@Required
112
	public void setMappedResultSetFactory(final MappedResultSetFactory mappedResultSetFactory) {
113
		this.mappedResultSetFactory = mappedResultSetFactory;
114
	}
115

  
116
	public String getInputEprParam() {
117
		return inputEprParam;
118
	}
119

  
120
	public void setInputEprParam(final String inputEprParam) {
121
		this.inputEprParam = inputEprParam;
122
	}
123

  
124
	public String getOutputEprParam() {
125
		return outputEprParam;
126
	}
127

  
128
	public void setOutputEprParam(final String outputEprParam) {
129
		this.outputEprParam = outputEprParam;
130
	}
131

  
132
	public String getTransformationRuleId() {
133
		return transformationRuleId;
134
	}
135

  
136
	public void setTransformationRuleId(final String transformationRuleId) {
137
		this.transformationRuleId = transformationRuleId;
138
	}
139

  
140
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/transform/TransformJobNode.java
1
package eu.dnetlib.msro.worker.nodes.transform;
2

  
3
import org.springframework.beans.factory.annotation.Autowired;
4
import org.springframework.beans.factory.annotation.Required;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8

  
9
import eu.dnetlib.data.resultSet.ResultSetFactory;
10
import eu.dnetlib.data.resultSet.transform.TransformerFactory;
11
import eu.dnetlib.enabling.datastructures.TransformationRule;
12
import eu.dnetlib.enabling.is.client.InformationServiceClient;
13
import eu.dnetlib.msro.worker.nodes.SimpleJobNode;
14
import eu.dnetlib.rmi.objects.resultSet.ResultSet;
15
import eu.dnetlib.rmi.soap.exceptions.ManagerServiceException;
16

  
17
public class TransformJobNode extends SimpleJobNode {
18

  
19
	private String inputResultSetParam;
20
	private String outputResultSetParam;
21
	private String ruleId;
22

  
23
	@Autowired
24
	private InformationServiceClient isClient;
25

  
26
	@Autowired
27
	private ResultSetFactory resultSetFactory;
28

  
29
	private TransformerFactory transformerFactory;
30

  
31
	@Override
32
	protected String execute(final NodeToken token) throws Exception {
33
		@SuppressWarnings("unchecked")
34
		final ResultSet<String> inputRs = (ResultSet<String>) token.getEnv().getTransientAttribute(inputResultSetParam);
35
		if (inputRs == null) { throw new ManagerServiceException("InputEprParam (" + inputResultSetParam + ") not found in ENV"); }
36

  
37
		final TransformationRule rule = isClient.getResourceByCode(ruleId, TransformationRule.class);
38

  
39
		final ResultSet<String> outputRs = resultSetFactory.transform(inputRs, transformerFactory.createTransformer(rule));
40

  
41
		token.getEnv().setAttribute(outputResultSetParam, outputRs.toString());
42

  
43
		return Arc.DEFAULT_ARC;
44
	}
45

  
46
	public String getRuleId() {
47
		return ruleId;
48
	}
49

  
50
	public void setRuleId(final String ruleId) {
51
		this.ruleId = ruleId;
52
	}
53

  
54
	public String getInputResultSetParam() {
55
		return inputResultSetParam;
56
	}
57

  
58
	public void setInputResultSetParam(final String inputResultSetParam) {
59
		this.inputResultSetParam = inputResultSetParam;
60
	}
61

  
62
	public String getOutputResultSetParam() {
63
		return outputResultSetParam;
64
	}
65

  
66
	public void setOutputResultSetParam(final String outputResultSetParam) {
67
		this.outputResultSetParam = outputResultSetParam;
68
	}
69

  
70
	public TransformerFactory getTransformerFactory() {
71
		return transformerFactory;
72
	}
73

  
74
	@Required
75
	public void setTransformerFactory(final TransformerFactory transformerFactory) {
76
		this.transformerFactory = transformerFactory;
77
	}
78

  
79
}
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/mdstore/MultipleMdStoreIterator.java
1
package eu.dnetlib.msro.worker.nodes.mdstore;
2

  
3
import java.util.Iterator;
4
import java.util.List;
5

  
6
import javax.annotation.Resource;
7
import javax.xml.ws.wsaddressing.W3CEndpointReference;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Autowired;
12

  
13
import eu.dnetlib.data.mdstore.MDStoreService;
14
import eu.dnetlib.data.mdstore.MDStoreServiceException;
15

  
16
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
17

  
18
// TODO: Auto-generated Javadoc
19
/**
20
 * The Class MultipleMdStoreIterator.
21
 */
22
public class MultipleMdStoreIterator implements Iterator<String> {
23

  
24
	private static final Log log = LogFactory.getLog(MultipleMdStoreIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
25

  
26
	/** The service locator. */
27
	@Resource
28
	private DnetServiceLocator serviceLocator;
29

  
30
	/** The md i ds. */
31
	private List<String> mdIDs;
32

  
33
	/** The current id. */
34
	private String currentId = null;
35

  
36
	/** The current iterator. */
37
	private Iterator<String> currentIterator;
38

  
39
	/** The result set client factory. */
40
	@Autowired
41
	private ResultSetClientFactory resultSetClientFactory;
42

  
43
	/**
44
	 * Instantiates a new multiple md store iterator.
45
	 *
46
	 * @param mdstoreLocator
47
	 *            the mdstore locator
48
	 * @param mdIds
49
	 *            the md ids
50
	 */
51
	public MultipleMdStoreIterator(final DnetServiceLocator serviceLocator, final List<String> mdIds, final ResultSetClientFactory resultSetClientFactory) {
52
		this.serviceLocator = serviceLocator;
53
		this.mdIDs = mdIds;
54
		this.resultSetClientFactory = resultSetClientFactory;
55
		getNextMDStoreRecords();
56

  
57
	}
58

  
59
	/*
60
	 * (non-Javadoc)
61
	 * 
62
	 * @see java.util.Iterator#hasNext()
63
	 */
64
	@Override
65
	public boolean hasNext() {
66
		if (currentId == null || currentIterator == null) { return false; }
67
		return currentIterator.hasNext();
68
	}
69

  
70
	/*
71
	 * (non-Javadoc)
72
	 * 
73
	 * @see java.util.Iterator#next()
74
	 */
75
	@Override
76
	public String next() {
77
		String nextElement = currentIterator.next();
78
		if (!currentIterator.hasNext()) {
79
			getNextMDStoreRecords();
80
		}
81
		return nextElement;
82
	}
83

  
84
	/*
85
	 * (non-Javadoc)
86
	 * 
87
	 * @see java.util.Iterator#remove()
88
	 */
89
	@Override
90
	public void remove() {
91
		currentIterator.remove();
92
	}
93

  
94
	/**
95
	 * Gets the next md store records.
96
	 *
97
	 * @return the next md store records
98
	 */
99
	private void getNextMDStoreRecords() {
100
		if (mdIDs.size() > 0) {
101
			currentId = mdIDs.remove(0);
102
			currentIterator = getIterableResultset(currentId);
103
		}
104
	}
105

  
106
	/**
107
	 * Gets the iterable resultset.
108
	 *
109
	 * @param id
110
	 *            the id
111
	 * @return the iterable resultset
112
	 */
113
	private Iterator<String> getIterableResultset(final String id) {
114
		try {
115
			W3CEndpointReference epr = serviceLocator.getService(MDStoreService.class, id).deliverMDRecords(id, "", "", "");
116
			Iterable<String> input = resultSetClientFactory.getClient(epr);
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff