Revision 36494
Added by Michele Artini about 9 years ago
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/AsyncJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes; |
|
2 |
|
|
3 |
import java.util.concurrent.ExecutorService; |
|
4 |
import java.util.concurrent.Executors; |
|
5 |
|
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.Engine; |
|
10 |
import com.googlecode.sarasvati.NodeToken; |
|
11 |
|
|
12 |
import eu.dnetlib.msro.worker.WorkflowConstants; |
|
13 |
|
|
14 |
public abstract class AsyncJobNode extends SarasvatiJobNode { |
|
15 |
|
|
16 |
/** |
|
17 |
* logger. |
|
18 |
*/ |
|
19 |
private static final Log log = LogFactory.getLog(AsyncJobNode.class); |
|
20 |
|
|
21 |
private final ExecutorService executor = Executors.newCachedThreadPool(); |
|
22 |
|
|
23 |
@Override |
|
24 |
public void execute(final Engine engine, final NodeToken token) { |
|
25 |
super.execute(engine, token); |
|
26 |
|
|
27 |
log.info("executing async node"); |
|
28 |
|
|
29 |
executor.execute(new Runnable() { |
|
30 |
|
|
31 |
@Override |
|
32 |
public void run() { |
|
33 |
try { |
|
34 |
log.debug("START NODE: " + getBeanName()); |
|
35 |
beforeStart(token); |
|
36 |
String arc = execute(token); |
|
37 |
beforeCompleted(token); |
|
38 |
log.debug("END NODE (SUCCESS): " + getBeanName()); |
|
39 |
engine.complete(token, arc); |
|
40 |
} catch (Throwable e) { |
|
41 |
log.error("got exception while executing workflow node", e); |
|
42 |
log.debug("END NODE (FAILED): " + getBeanName()); |
|
43 |
beforeFailed(token); |
|
44 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true); |
|
45 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, e.getMessage()); |
|
46 |
engine.complete(token, "failed"); |
|
47 |
} |
|
48 |
} |
|
49 |
}); |
|
50 |
} |
|
51 |
|
|
52 |
abstract protected String execute(final NodeToken token) throws Exception; |
|
53 |
|
|
54 |
protected void beforeStart(final NodeToken token) {} |
|
55 |
|
|
56 |
protected void beforeCompleted(final NodeToken token) {} |
|
57 |
|
|
58 |
protected void beforeFailed(final NodeToken token) {} |
|
59 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/unpack/UnpackJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.unpack; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.util.Iterator; |
|
5 |
import java.util.Queue; |
|
6 |
import java.util.concurrent.PriorityBlockingQueue; |
|
7 |
|
|
8 |
import javax.annotation.Resource; |
|
9 |
|
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.dom4j.Document; |
|
13 |
import org.dom4j.Node; |
|
14 |
import org.dom4j.io.SAXReader; |
|
15 |
|
|
16 |
import com.googlecode.sarasvati.Arc; |
|
17 |
import com.googlecode.sarasvati.NodeToken; |
|
18 |
|
|
19 |
import eu.dnetlib.data.resultSet.ResultSetFactory; |
|
20 |
import eu.dnetlib.data.resultSet.client.ResultSetClientFactory; |
|
21 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
22 |
import eu.dnetlib.rmi.objects.resultSet.ResultSet; |
|
23 |
|
|
24 |
public class UnpackJobNode extends SimpleJobNode { |
|
25 |
|
|
26 |
/** |
|
27 |
* logger. |
|
28 |
*/ |
|
29 |
private static final Log log = LogFactory.getLog(UnpackJobNode.class); |
|
30 |
|
|
31 |
@Resource |
|
32 |
private ResultSetClientFactory resultSetClientFactory; |
|
33 |
@Resource |
|
34 |
private ResultSetFactory resultSetFactory; |
|
35 |
|
|
36 |
private String inputResultSetParam; |
|
37 |
private String outputResultSetParam; |
|
38 |
private String xpath; |
|
39 |
|
|
40 |
@Override |
|
41 |
protected String execute(final NodeToken token) throws Exception { |
|
42 |
@SuppressWarnings("unchecked") |
|
43 |
final ResultSet<String> rsInput = (ResultSet<String>) token.getEnv().getTransientAttribute(inputResultSetParam); |
|
44 |
|
|
45 |
final Iterator<String> client = resultSetClientFactory.getClient(rsInput, String.class, 100).iterator(); |
|
46 |
final Queue<String> queue = new PriorityBlockingQueue<String>(); |
|
47 |
|
|
48 |
if (client.hasNext()) { |
|
49 |
populateQueue(queue, client.next(), xpath); |
|
50 |
} |
|
51 |
|
|
52 |
final ResultSet<String> rsOutput = resultSetFactory.createResultSet(new Iterable<String>() { |
|
53 |
|
|
54 |
@Override |
|
55 |
public Iterator<String> iterator() { |
|
56 |
return new Iterator<String>() { |
|
57 |
|
|
58 |
@Override |
|
59 |
public boolean hasNext() { |
|
60 |
synchronized (queue) { |
|
61 |
return !queue.isEmpty(); |
|
62 |
} |
|
63 |
} |
|
64 |
|
|
65 |
@Override |
|
66 |
public String next() { |
|
67 |
synchronized (queue) { |
|
68 |
final String res = queue.poll(); |
|
69 |
while (queue.isEmpty() && |
|
70 |
client.hasNext()) { |
|
71 |
populateQueue(queue, client.next(), xpath); |
|
72 |
} |
|
73 |
return res; |
|
74 |
} |
|
75 |
} |
|
76 |
|
|
77 |
@Override |
|
78 |
public void remove() {} |
|
79 |
}; |
|
80 |
} |
|
81 |
}); |
|
82 |
|
|
83 |
token.getEnv().setTransientAttribute(outputResultSetParam, rsOutput); |
|
84 |
|
|
85 |
return Arc.DEFAULT_ARC; |
|
86 |
} |
|
87 |
|
|
88 |
private void populateQueue(final Queue<String> queue, final String record, final String xpath) { |
|
89 |
try { |
|
90 |
final SAXReader reader = new SAXReader(); |
|
91 |
final Document doc = reader.read(new StringReader(record)); |
|
92 |
for (Object o : doc.selectNodes(xpath)) { |
|
93 |
queue.add(((Node) o).asXML()); |
|
94 |
} |
|
95 |
} catch (Exception e) { |
|
96 |
log.error("Error unpacking record: \n" + record, e); |
|
97 |
throw new RuntimeException(e); |
|
98 |
} |
|
99 |
} |
|
100 |
|
|
101 |
public String getXpath() { |
|
102 |
return xpath; |
|
103 |
} |
|
104 |
|
|
105 |
public void setXpath(final String xpath) { |
|
106 |
this.xpath = xpath; |
|
107 |
} |
|
108 |
|
|
109 |
public String getInputResultSetParam() { |
|
110 |
return inputResultSetParam; |
|
111 |
} |
|
112 |
|
|
113 |
public void setInputResultSetParam(final String inputResultSetParam) { |
|
114 |
this.inputResultSetParam = inputResultSetParam; |
|
115 |
} |
|
116 |
|
|
117 |
public String getOutputResultSetParam() { |
|
118 |
return outputResultSetParam; |
|
119 |
} |
|
120 |
|
|
121 |
public void setOutputResultSetParam(final String outputResultSetParam) { |
|
122 |
this.outputResultSetParam = outputResultSetParam; |
|
123 |
} |
|
124 |
|
|
125 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/BlackboardJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes; |
|
2 |
|
|
3 |
import javax.annotation.Resource; |
|
4 |
|
|
5 |
import org.apache.commons.lang.StringUtils; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
import com.googlecode.sarasvati.Arc; |
|
10 |
import com.googlecode.sarasvati.Engine; |
|
11 |
import com.googlecode.sarasvati.NodeToken; |
|
12 |
import com.googlecode.sarasvati.env.Env; |
|
13 |
|
|
14 |
import eu.dnetlib.common.services.locators.DnetServiceLocator; |
|
15 |
import eu.dnetlib.enabling.blackboard.BlackboardCallback; |
|
16 |
import eu.dnetlib.enabling.blackboard.BlackboardDispatcher; |
|
17 |
import eu.dnetlib.msro.worker.WorkflowConstants; |
|
18 |
import eu.dnetlib.rmi.blackboard.AbstractBlackboardMessage; |
|
19 |
|
|
20 |
public abstract class BlackboardJobNode<T> extends SarasvatiJobNode implements BlackboardCallback<T> { |
|
21 |
|
|
22 |
/** |
|
23 |
* logger. |
|
24 |
*/ |
|
25 |
private static final Log log = LogFactory.getLog(BlackboardJobNode.class); |
|
26 |
|
|
27 |
@Resource |
|
28 |
private DnetServiceLocator serviceLocator; |
|
29 |
|
|
30 |
@Resource |
|
31 |
private BlackboardDispatcher blackboardDispatcher; |
|
32 |
|
|
33 |
private Engine engine; |
|
34 |
|
|
35 |
private NodeToken token; |
|
36 |
|
|
37 |
@Override |
|
38 |
public void execute(final Engine engine, final NodeToken token) { |
|
39 |
super.execute(engine, token); |
|
40 |
|
|
41 |
this.engine = engine; |
|
42 |
this.token = token; |
|
43 |
|
|
44 |
log.info("executing blackboard node"); |
|
45 |
|
|
46 |
try { |
|
47 |
token.getEnv().setAttribute(WorkflowConstants.BLACKBOARD_IS_BLACKBOARD, true); |
|
48 |
|
|
49 |
final String serviceId = obtainServiceId(token); |
|
50 |
if (StringUtils.isBlank(serviceId)) { |
|
51 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true); |
|
52 |
final String msg = "cannot locate target service profile: " + serviceId; |
|
53 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, msg); |
|
54 |
log.error(msg); |
|
55 |
engine.complete(token, "failed"); |
|
56 |
return; |
|
57 |
} |
|
58 |
blackboardDispatcher.createDispatcher(serviceId, getMessage(), this); |
|
59 |
} catch (final Throwable e) { |
|
60 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true); |
|
61 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e); |
|
62 |
engine.complete(token, "failed"); |
|
63 |
log.error("cannot prepare blackboard job", e); |
|
64 |
} |
|
65 |
} |
|
66 |
|
|
67 |
abstract protected T getMessage(); |
|
68 |
|
|
69 |
abstract protected String obtainServiceId(NodeToken token); |
|
70 |
|
|
71 |
@Override |
|
72 |
public void onSuccess(final T t) { |
|
73 |
log.debug("Blackboard workflow node DONE"); |
|
74 |
saveResponseInEnv(t, token.getEnv()); |
|
75 |
engine.complete(token, Arc.DEFAULT_ARC); |
|
76 |
engine.executeQueuedArcTokens(token.getProcess()); |
|
77 |
} |
|
78 |
|
|
79 |
abstract protected void saveResponseInEnv(final T t, final Env env); |
|
80 |
|
|
81 |
@Override |
|
82 |
public void onFail(final T t) { |
|
83 |
final String error = t instanceof AbstractBlackboardMessage ? ((AbstractBlackboardMessage) t).getError() : "Unknown Error"; |
|
84 |
log.warn("Blackboard workflow node FAILED: " + error); |
|
85 |
saveResponseInEnv(t, token.getEnv()); |
|
86 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true); |
|
87 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, error); |
|
88 |
engine.complete(token, "failed"); |
|
89 |
} |
|
90 |
|
|
91 |
@Override |
|
92 |
public void onOngoing(final T t) { |
|
93 |
getToken().getEnv().setAttribute(WorkflowConstants.BLACKBOARD_IS_GOING, true); |
|
94 |
} |
|
95 |
|
|
96 |
public Engine getEngine() { |
|
97 |
return engine; |
|
98 |
} |
|
99 |
|
|
100 |
public void setEngine(final Engine engine) { |
|
101 |
this.engine = engine; |
|
102 |
} |
|
103 |
|
|
104 |
public NodeToken getToken() { |
|
105 |
return token; |
|
106 |
} |
|
107 |
|
|
108 |
public void setToken(final NodeToken token) { |
|
109 |
this.token = token; |
|
110 |
} |
|
111 |
|
|
112 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/sel/SelectiveNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.sel; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.Arc; |
|
4 |
import com.googlecode.sarasvati.NodeToken; |
|
5 |
|
|
6 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
7 |
|
|
8 |
/** |
|
9 |
* The Class SelectiveNode allows to decide which path a workflow must take. |
|
10 |
*/ |
|
11 |
public class SelectiveNode extends SimpleJobNode { |
|
12 |
|
|
13 |
/** The selection. */ |
|
14 |
private String selection = Arc.DEFAULT_ARC; |
|
15 |
|
|
16 |
/* |
|
17 |
* (non-Javadoc) |
|
18 |
* |
|
19 |
* @see eu.dnetlib.msro.worker.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
20 |
*/ |
|
21 |
@Override |
|
22 |
protected String execute(final NodeToken token) throws Exception { |
|
23 |
return selection; |
|
24 |
} |
|
25 |
|
|
26 |
/** |
|
27 |
* Gets the selection. |
|
28 |
* |
|
29 |
* @return the selection |
|
30 |
*/ |
|
31 |
public String getSelection() { |
|
32 |
return selection; |
|
33 |
} |
|
34 |
|
|
35 |
/** |
|
36 |
* Sets the selection. |
|
37 |
* |
|
38 |
* @param selection |
|
39 |
* the new selection |
|
40 |
*/ |
|
41 |
public void setSelection(final String selection) { |
|
42 |
this.selection = selection; |
|
43 |
} |
|
44 |
|
|
45 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/objectStore/RetrieveMdStoreId.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.objectStore; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.Set; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.springframework.beans.factory.annotation.Required; |
|
9 |
|
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import com.google.gson.Gson; |
|
13 |
import com.google.gson.GsonBuilder; |
|
14 |
import com.googlecode.sarasvati.Arc; |
|
15 |
import com.googlecode.sarasvati.NodeToken; |
|
16 |
|
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
18 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
19 |
|
|
20 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
21 |
|
|
22 |
/** |
|
23 |
* The Class RetrieveMdStoreId is a job node used to retrieve the correct MDStore from which extract the url of the file to download. |
|
24 |
* metadata format and interpretation are injected as properties |
|
25 |
*/ |
|
26 |
public class RetrieveMdStoreId extends SimpleJobNode { |
|
27 |
|
|
28 |
/** The metadata format. */ |
|
29 |
private String metadataFormat; |
|
30 |
|
|
31 |
/** The interpretation. */ |
|
32 |
private String interpretation; |
|
33 |
|
|
34 |
/** The provider id. */ |
|
35 |
private String providerId; |
|
36 |
|
|
37 |
/** The service locator. */ |
|
38 |
@Resource |
|
39 |
private DnetServiceLocator serviceLocator; |
|
40 |
|
|
41 |
/* |
|
42 |
* (non-Javadoc) |
|
43 |
* |
|
44 |
* @see eu.dnetlib.msro.worker.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
45 |
*/ |
|
46 |
@Override |
|
47 |
protected String execute(final NodeToken token) throws Exception { |
|
48 |
|
|
49 |
String workflowQuery = |
|
50 |
"for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') where($x//DATAPROVIDER/@id='%s') return distinct-values($x//WORKFLOW/@id/string())"; |
|
51 |
|
|
52 |
List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(workflowQuery, providerId)); |
|
53 |
if (result.size() == 0) { throw new RuntimeException("there is no mdStore Associated to the provider " + token.getEnv().getAttribute(getProviderId())); } |
|
54 |
Set<String> workflowIds = Sets.newHashSet(result); |
|
55 |
|
|
56 |
Set<String> metadataIds = getMdStores(workflowIds); |
|
57 |
Gson g = new GsonBuilder().disableHtmlEscaping().create(); |
|
58 |
token.getEnv().setAttribute("mdId", g.toJson(metadataIds)); |
|
59 |
|
|
60 |
token.getEnv().setAttribute("mdFormat", getMetadataFormat()); |
|
61 |
return Arc.DEFAULT_ARC; |
|
62 |
} |
|
63 |
|
|
64 |
private Set<String> getMdStores(final Set<String> workflowsId) { |
|
65 |
try { |
|
66 |
|
|
67 |
String query = "//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']//PARAM[./@category/string()='MDSTORE_ID']/text()"; |
|
68 |
|
|
69 |
Set<String> mdStores = Sets.newHashSet(); |
|
70 |
|
|
71 |
if (workflowsId == null) { return null; } |
|
72 |
|
|
73 |
for (String workflowId : workflowsId) { |
|
74 |
List<String> result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, workflowId)); |
|
75 |
Set<String> metadataIds = Sets.newHashSet(result); |
|
76 |
mdStores.addAll(getRightMetadataId(Lists.newArrayList(metadataIds))); |
|
77 |
} |
|
78 |
return mdStores; |
|
79 |
|
|
80 |
} catch (ISLookUpException e) { |
|
81 |
|
|
82 |
return null; |
|
83 |
} |
|
84 |
} |
|
85 |
|
|
86 |
/** |
|
87 |
* Gets the right metadata id whith the format metadataFormat and interpretation interpretation |
|
88 |
* |
|
89 |
* @return the right metadata id |
|
90 |
* @throws ISLookUpException |
|
91 |
*/ |
|
92 |
private Set<String> getRightMetadataId(final Iterable<String> ids) throws ISLookUpException { |
|
93 |
String query = |
|
94 |
"let $x:=//RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s'] return concat($x//METADATA_FORMAT/text(), '::<<>>::', $x//METADATA_FORMAT_INTERPRETATION/text())"; |
|
95 |
Set<String> result = Sets.newHashSet(); |
|
96 |
|
|
97 |
for (String id : ids) { |
|
98 |
|
|
99 |
List<String> results = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(String.format(query, id)); |
|
100 |
if (results.size() > 0) { |
|
101 |
String[] values = results.get(0).split("::<<>>::"); |
|
102 |
if (metadataFormat.equals(values[0]) && interpretation.equals(values[1])) { |
|
103 |
result.add(id); |
|
104 |
} |
|
105 |
} |
|
106 |
} |
|
107 |
return result; |
|
108 |
|
|
109 |
} |
|
110 |
|
|
111 |
/** |
|
112 |
* Gets the interpretation. |
|
113 |
* |
|
114 |
* @return the interpretation |
|
115 |
*/ |
|
116 |
public String getInterpretation() { |
|
117 |
return interpretation; |
|
118 |
} |
|
119 |
|
|
120 |
/** |
|
121 |
* Sets the interpretation. |
|
122 |
* |
|
123 |
* @param interpretation |
|
124 |
* the interpretation to set |
|
125 |
*/ |
|
126 |
@Required |
|
127 |
public void setInterpretation(final String interpretation) { |
|
128 |
this.interpretation = interpretation; |
|
129 |
} |
|
130 |
|
|
131 |
/** |
|
132 |
* Gets the metadata format. |
|
133 |
* |
|
134 |
* @return the metadataFormat |
|
135 |
*/ |
|
136 |
public String getMetadataFormat() { |
|
137 |
return metadataFormat; |
|
138 |
} |
|
139 |
|
|
140 |
/** |
|
141 |
* Sets the metadata format. |
|
142 |
* |
|
143 |
* @param metadataFormat |
|
144 |
* the metadataFormat to set |
|
145 |
*/ |
|
146 |
@Required |
|
147 |
public void setMetadataFormat(final String metadataFormat) { |
|
148 |
this.metadataFormat = metadataFormat; |
|
149 |
} |
|
150 |
|
|
151 |
/** |
|
152 |
* Gets the provider id. |
|
153 |
* |
|
154 |
* @return the providerId |
|
155 |
*/ |
|
156 |
public String getProviderId() { |
|
157 |
return providerId; |
|
158 |
} |
|
159 |
|
|
160 |
/** |
|
161 |
* Sets the provider id. |
|
162 |
* |
|
163 |
* @param providerId |
|
164 |
* the providerId to set |
|
165 |
*/ |
|
166 |
public void setProviderId(final String providerId) { |
|
167 |
this.providerId = providerId; |
|
168 |
} |
|
169 |
|
|
170 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/objectStore/UpdateObjectStoreSizeJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.objectStore; |
|
2 |
|
|
3 |
import javax.annotation.Resource; |
|
4 |
|
|
5 |
import com.googlecode.sarasvati.Arc; |
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
|
|
8 |
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService; |
|
9 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
10 |
|
|
11 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
12 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
13 |
|
|
14 |
// TODO: Auto-generated Javadoc |
|
15 |
/** |
|
16 |
* The Class UpdateObjectStoreSizeJobNode. |
|
17 |
*/ |
|
18 |
public class UpdateObjectStoreSizeJobNode extends SimpleJobNode { |
|
19 |
|
|
20 |
/** The object store id. */ |
|
21 |
private String objectStoreIdParam; |
|
22 |
|
|
23 |
/** The service locator. */ |
|
24 |
@Resource |
|
25 |
private DnetServiceLocator serviceLocator; |
|
26 |
|
|
27 |
/* |
|
28 |
* (non-Javadoc) |
|
29 |
* |
|
30 |
* @see eu.dnetlib.msro.worker.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken) |
|
31 |
*/ |
|
32 |
@Override |
|
33 |
protected String execute(final NodeToken token) throws Exception { |
|
34 |
|
|
35 |
final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class); |
|
36 |
|
|
37 |
int size = serviceLocator.getService(ObjectStoreService.class, objectStoreIdParam).getSize(objectStoreIdParam); |
|
38 |
|
|
39 |
String now = DateUtils.now_ISO8601(); |
|
40 |
|
|
41 |
String mdstoreXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + objectStoreIdParam + "']" |
|
42 |
+ "return update value $x//LAST_STORAGE_DATE with '" + now + "'"; |
|
43 |
|
|
44 |
registry.executeXUpdate(mdstoreXUpdate); |
|
45 |
|
|
46 |
String mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + objectStoreIdParam + "']" |
|
47 |
+ "return update value $x//COUNT_STORE with '" + size + "'"; |
|
48 |
|
|
49 |
registry.executeXUpdate(mdstoreNumberXUpdate); |
|
50 |
|
|
51 |
mdstoreNumberXUpdate = "for $x in //RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value = '" + objectStoreIdParam + "']" |
|
52 |
+ "return update value $x//STORE_SIZE with '" + size + "'"; |
|
53 |
|
|
54 |
registry.executeXUpdate(mdstoreNumberXUpdate); |
|
55 |
|
|
56 |
return Arc.DEFAULT_ARC; |
|
57 |
} |
|
58 |
|
|
59 |
/** |
|
60 |
* Gets the object store id param. |
|
61 |
* |
|
62 |
* @return the objectStoreIdParam |
|
63 |
*/ |
|
64 |
public String getObjectStoreIdParam() { |
|
65 |
return objectStoreIdParam; |
|
66 |
} |
|
67 |
|
|
68 |
/** |
|
69 |
* Sets the object store id param. |
|
70 |
* |
|
71 |
* @param objectStoreIdParam |
|
72 |
* the new object store id param |
|
73 |
*/ |
|
74 |
public void setObjectStoreIdParam(final String objectStoreIdParam) { |
|
75 |
this.objectStoreIdParam = objectStoreIdParam; |
|
76 |
} |
|
77 |
|
|
78 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/repobye/DeleteMetaWfJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.repobye; |
|
2 |
|
|
3 |
import java.io.StringReader; |
|
4 |
import java.io.StringWriter; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.dom4j.Document; |
|
11 |
import org.dom4j.Node; |
|
12 |
import org.dom4j.io.SAXReader; |
|
13 |
|
|
14 |
import com.googlecode.sarasvati.Arc; |
|
15 |
import com.googlecode.sarasvati.NodeToken; |
|
16 |
|
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
18 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
19 |
|
|
20 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
21 |
import eu.dnetlib.msro.workflows.util.WorkflowConstants; |
|
22 |
|
|
23 |
public class DeleteMetaWfJobNode extends SimpleJobNode { |
|
24 |
|
|
25 |
private String metaWfId; |
|
26 |
|
|
27 |
@Resource |
|
28 |
private DnetServiceLocator serviceLocator; |
|
29 |
|
|
30 |
private static final Log log = LogFactory.getLog(DeleteMetaWfJobNode.class); |
|
31 |
|
|
32 |
@Override |
|
33 |
protected String execute(final NodeToken token) throws Exception { |
|
34 |
final String profile = serviceLocator.getService(ISLookUpService.class).getResourceProfile(metaWfId); |
|
35 |
final Document doc = new SAXReader().read(new StringReader(profile)); |
|
36 |
|
|
37 |
final String dsId = doc.valueOf("//DATAPROVIDER/@id"); |
|
38 |
final String dsName = doc.valueOf("//DATAPROVIDER/text()"); |
|
39 |
final String ifaceId = doc.valueOf("//DATAPROVIDER/@interface"); |
|
40 |
final String destroyWfId = doc.valueOf("//CONFIGURATION/@destroyWorkflow"); |
|
41 |
|
|
42 |
log.info("Removing a MetaWf of dataprovider: " + dsId); |
|
43 |
|
|
44 |
token.getEnv().setAttribute(WorkflowConstants.DATAPROVIDER_ID, dsId); |
|
45 |
token.getEnv().setAttribute(WorkflowConstants.DATAPROVIDER_NAME, dsName); |
|
46 |
token.getEnv().setAttribute(WorkflowConstants.DATAPROVIDER_INTERFACE, ifaceId); |
|
47 |
|
|
48 |
final ISRegistryService registry = serviceLocator.getService(ISRegistryService.class); |
|
49 |
|
|
50 |
for (Object o : doc.selectNodes("//WORKFLOW")) { |
|
51 |
final String wfId = ((Node) o).valueOf("@id"); |
|
52 |
try { |
|
53 |
registry.deleteProfile(wfId); |
|
54 |
log.info(" - Deleted Workflow: " + wfId); |
|
55 |
} catch (Exception e) { |
|
56 |
log.error(" - (ERR) Error deleting profile " + wfId); |
|
57 |
} |
|
58 |
} |
|
59 |
registry.deleteProfile(metaWfId); |
|
60 |
log.info(" - Deleted MetaWorkflow: " + metaWfId); |
|
61 |
|
|
62 |
registry.deleteProfile(destroyWfId); |
|
63 |
log.info(" - Deleted destroy workflow: " + destroyWfId); |
|
64 |
|
|
65 |
verifyDatasource(dsId, ifaceId); |
|
66 |
|
|
67 |
return Arc.DEFAULT_ARC; |
|
68 |
} |
|
69 |
|
|
70 |
private void verifyDatasource(final String dsId, final String ifaceId) throws Exception { |
|
71 |
final StringWriter sw = new StringWriter(); |
|
72 |
|
|
73 |
sw.append("for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType') where"); |
|
74 |
sw.append(" $x//DATAPROVIDER/@id = '" + dsId + "' and "); |
|
75 |
sw.append(" $x//DATAPROVIDER/@interface = '" + ifaceId + "' and "); |
|
76 |
sw.append(" $x//RESOURCE_IDENTIFIER/@value != '" + metaWfId + "' "); |
|
77 |
sw.append("return $x//RESOURCE_IDENTIFIER/@value/string()"); |
|
78 |
|
|
79 |
final boolean active = !serviceLocator.getService(ISLookUpService.class).quickSearchProfile(sw.toString()).isEmpty(); |
|
80 |
|
|
81 |
log.info(" - Updating iface, active status: " + active); |
|
82 |
|
|
83 |
updateIfaceActivationStatus(dsId, ifaceId, active); |
|
84 |
} |
|
85 |
|
|
86 |
protected void updateIfaceActivationStatus(final String dsId, final String ifaceId, final boolean active) throws Exception { |
|
87 |
serviceLocator.getService(ISRegistryService.class).updateProfileNode(dsId, "//INTERFACE[@id = '" + ifaceId + "']/@active", "'" + active + "'"); |
|
88 |
} |
|
89 |
|
|
90 |
public String getMetaWfId() { |
|
91 |
return metaWfId; |
|
92 |
} |
|
93 |
|
|
94 |
public void setMetaWfId(final String metaWfId) { |
|
95 |
this.metaWfId = metaWfId; |
|
96 |
} |
|
97 |
|
|
98 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/SuccessFailureNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes; |
|
2 |
|
|
3 |
import java.util.Map; |
|
4 |
|
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import org.antlr.stringtemplate.StringTemplate; |
|
8 |
import org.apache.commons.io.IOUtils; |
|
9 |
import org.apache.commons.lang.StringEscapeUtils; |
|
10 |
import org.apache.commons.lang.StringUtils; |
|
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 |
import org.springframework.beans.factory.annotation.Required; |
|
14 |
|
|
15 |
import com.google.common.collect.Maps; |
|
16 |
import com.googlecode.sarasvati.Arc; |
|
17 |
import com.googlecode.sarasvati.NodeToken; |
|
18 |
|
|
19 |
import eu.dnetlib.common.services.locators.DnetServiceLocator; |
|
20 |
import eu.dnetlib.miscutils.DateUtils; |
|
21 |
import eu.dnetlib.msro.worker.WorkflowConstants; |
|
22 |
|
|
23 |
/** |
|
24 |
* The success node sets the "isCompletedSuccessfully" env var. |
|
25 |
* |
|
26 |
*/ |
|
27 |
public class SuccessFailureNode extends SimpleJobNode { |
|
28 |
|
|
29 |
/** |
|
30 |
* is completed successfully. |
|
31 |
*/ |
|
32 |
private boolean success; |
|
33 |
|
|
34 |
@Resource |
|
35 |
private DnetServiceLocator serviceLocator; |
|
36 |
|
|
37 |
private static final Log log = LogFactory.getLog(SuccessFailureNode.class); |
|
38 |
|
|
39 |
@Override |
|
40 |
protected String execute(final NodeToken token) { |
|
41 |
final String profileId = token.getFullEnv().getAttribute(WorkflowConstants.SYSTEM_WF_PROFILE_ID).trim(); |
|
42 |
|
|
43 |
final long now = DateUtils.now(); |
|
44 |
final String date = DateUtils.getDate_ISO8601(now); |
|
45 |
|
|
46 |
token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_END_DATE, now); |
|
47 |
token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_END_HUMAN_DATE, date); |
|
48 |
|
|
49 |
final Map<String, String> params = mergeEnvAttributes(token); |
|
50 |
try { |
|
51 |
final String template = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/msro/workflows/templates/workflow_status.xml.st")); |
|
52 |
final StringTemplate st = new StringTemplate(template); |
|
53 |
st.setAttribute("procId", StringEscapeUtils.escapeXml(params.get(WorkflowConstants.SYSTEM_WF_PROCESS_ID))); |
|
54 |
st.setAttribute("date", StringEscapeUtils.escapeXml(date)); |
|
55 |
st.setAttribute("params", filterOutputParams(params)); |
|
56 |
if (!isSuccess()) { |
|
57 |
st.setAttribute("error", StringEscapeUtils.escapeXml(params.get(WorkflowConstants.SYSTEM_ERROR))); |
|
58 |
} |
|
59 |
|
|
60 |
token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_COMPLETED_SUCCESSFULLY, isSuccess()); |
|
61 |
} catch (Exception e) { |
|
62 |
log.error("Error updating workflow profile: " + profileId, e); |
|
63 |
token.getProcess().getEnv().setAttribute(WorkflowConstants.SYSTEM_COMPLETED_SUCCESSFULLY, false); |
|
64 |
} |
|
65 |
|
|
66 |
return Arc.DEFAULT_ARC; |
|
67 |
} |
|
68 |
|
|
69 |
private Map<String, String> filterOutputParams(final Map<String, String> map) { |
|
70 |
final Map<String, String> res = Maps.newHashMap(); |
|
71 |
|
|
72 |
if (map != null) { |
|
73 |
for (String k : map.keySet()) { |
|
74 |
if (!StringUtils.isBlank(k) && (k.startsWith(WorkflowConstants.DATAPROVIDER_PREFIX) || k.startsWith(WorkflowConstants.MAIN_LOG_PREFIX))) { |
|
75 |
final String key = StringEscapeUtils.escapeXml(k); |
|
76 |
final String v = map.get(k); |
|
77 |
res.put(key, v != null ? StringEscapeUtils.escapeXml(v) : "null"); |
|
78 |
} |
|
79 |
} |
|
80 |
} |
|
81 |
|
|
82 |
return res; |
|
83 |
} |
|
84 |
|
|
85 |
private Map<String, String> mergeEnvAttributes(final NodeToken token) { |
|
86 |
final Map<String, String> map = Maps.newHashMap(); |
|
87 |
|
|
88 |
for (String s : token.getEnv().getAttributeNames()) { |
|
89 |
map.put(s, token.getEnv().getAttribute(s)); |
|
90 |
} |
|
91 |
for (String s : token.getFullEnv().getAttributeNames()) { |
|
92 |
map.put(s, token.getFullEnv().getAttribute(s)); |
|
93 |
} |
|
94 |
return map; |
|
95 |
} |
|
96 |
|
|
97 |
public boolean isSuccess() { |
|
98 |
return success; |
|
99 |
} |
|
100 |
|
|
101 |
@Required |
|
102 |
public void setSuccess(final boolean success) { |
|
103 |
this.success = success; |
|
104 |
} |
|
105 |
|
|
106 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/index/PrepareCreateIndexJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.index; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.NodeToken; |
|
7 |
|
|
8 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
9 |
|
|
10 |
public class PrepareCreateIndexJobNode extends SimpleJobNode { |
|
11 |
|
|
12 |
private static final Log log = LogFactory.getLog(PrepareCreateIndexJobNode.class); |
|
13 |
|
|
14 |
private String layout; |
|
15 |
private String format; |
|
16 |
private String interpretation; |
|
17 |
|
|
18 |
@Override |
|
19 |
protected String execute(final NodeToken token) throws Exception { |
|
20 |
log.info("Preparing env for CreateIndexJobNode"); |
|
21 |
token.getEnv().setAttribute("layout", layout); |
|
22 |
token.getEnv().setAttribute("format", format); |
|
23 |
token.getEnv().setAttribute("interpretation", interpretation); |
|
24 |
return null; |
|
25 |
} |
|
26 |
|
|
27 |
public String getLayout() { |
|
28 |
return layout; |
|
29 |
} |
|
30 |
|
|
31 |
public void setLayout(final String layout) { |
|
32 |
this.layout = layout; |
|
33 |
} |
|
34 |
|
|
35 |
public String getFormat() { |
|
36 |
return format; |
|
37 |
} |
|
38 |
|
|
39 |
public void setFormat(final String format) { |
|
40 |
this.format = format; |
|
41 |
} |
|
42 |
|
|
43 |
public String getInterpretation() { |
|
44 |
return interpretation; |
|
45 |
} |
|
46 |
|
|
47 |
public void setInterpretation(final String interpretation) { |
|
48 |
this.interpretation = interpretation; |
|
49 |
} |
|
50 |
|
|
51 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/download/DownloadFromMetadataJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.download; |
|
2 |
|
|
3 |
import java.util.Map; |
|
4 |
|
|
5 |
import org.springframework.beans.factory.annotation.Autowired; |
|
6 |
|
|
7 |
import com.googlecode.sarasvati.Engine; |
|
8 |
import com.googlecode.sarasvati.NodeToken; |
|
9 |
import com.googlecode.sarasvati.env.Env; |
|
10 |
|
|
11 |
import eu.dnetlib.data.download.rmi.DownloadService; |
|
12 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
13 |
import eu.dnetlib.msro.worker.nodes.BlackboardJobNode; |
|
14 |
import eu.dnetlib.msro.worker.nodes.ProgressJobNode; |
|
15 |
import eu.dnetlib.msro.worker.nodes.blackboard.BlackboardWorkflowJobListener; |
|
16 |
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory; |
|
17 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
18 |
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider; |
|
19 |
import eu.dnetlib.msro.workflows.util.WorkflowConstants; |
|
20 |
|
|
21 |
// TODO: Auto-generated Javadoc |
|
22 |
/** |
|
23 |
* The Class DownloadFromMetadata is a job node that send a blackboard message to the Download service to start to download file from url |
|
24 |
* retrieved by Metadata . |
|
25 |
*/ |
|
26 |
public class DownloadFromMetadataJobNode extends BlackboardJobNode implements ProgressJobNode { |
|
27 |
|
|
28 |
/** The inputepr param. */ |
|
29 |
private String inputeprParam; |
|
30 |
|
|
31 |
/** The obejct store id. */ |
|
32 |
private String objectStoreID; |
|
33 |
|
|
34 |
/** The plugin. */ |
|
35 |
private String plugin; |
|
36 |
|
|
37 |
/** The protocol. */ |
|
38 |
private String protocol; |
|
39 |
|
|
40 |
/** The mime type. */ |
|
41 |
private String mimeType; |
|
42 |
|
|
43 |
/** The process counting result set factory. */ |
|
44 |
@Autowired |
|
45 |
private ProcessCountingResultSetFactory processCountingResultSetFactory; |
|
46 |
|
|
47 |
/** The progress provider. */ |
|
48 |
private ResultsetProgressProvider progressProvider; |
|
49 |
|
|
50 |
/* |
|
51 |
* (non-Javadoc) |
|
52 |
* |
|
53 |
* @see eu.dnetlib.msro.worker.nodes.BlackboardJobNode#obtainServiceId(com.googlecode.sarasvati.NodeToken) |
|
54 |
*/ |
|
55 |
@Override |
|
56 |
protected String obtainServiceId(final NodeToken token) { |
|
57 |
return getServiceLocator().getServiceId(DownloadService.class); |
|
58 |
} |
|
59 |
|
|
60 |
/* |
|
61 |
* (non-Javadoc) |
|
62 |
* |
|
63 |
* @see eu.dnetlib.msro.worker.nodes.BlackboardJobNode#prepareJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob, |
|
64 |
* com.googlecode.sarasvati.NodeToken) |
|
65 |
*/ |
|
66 |
@Override |
|
67 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
|
68 |
job.setAction("DOWNLOAD"); |
|
69 |
final String eprS = token.getEnv().getAttribute(getInputeprParam()); |
|
70 |
this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), eprS); |
|
71 |
job.getParameters().put("epr", progressProvider.getEpr().toString()); |
|
72 |
job.getParameters().put("protocol", getProtocol()); |
|
73 |
job.getParameters().put("plugin", getPlugin()); |
|
74 |
job.getParameters().put("mimeType", getMimeType()); |
|
75 |
job.getParameters().put("objectStoreID", getObjectStoreID()); |
|
76 |
|
|
77 |
} |
|
78 |
|
|
79 |
/* |
|
80 |
* (non-Javadoc) |
|
81 |
* |
|
82 |
* @see eu.dnetlib.msro.worker.nodes.BlackboardJobNode#generateBlackboardListener(com.googlecode.sarasvati.Engine, |
|
83 |
* com.googlecode.sarasvati.NodeToken) |
|
84 |
*/ |
|
85 |
@Override |
|
86 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) { |
|
87 |
return new BlackboardWorkflowJobListener(engine, token) { |
|
88 |
|
|
89 |
@Override |
|
90 |
protected void populateEnv(final Env env, final Map<String, String> responseParams) { |
|
91 |
env.setAttribute(WorkflowConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total")); |
|
92 |
} |
|
93 |
}; |
|
94 |
} |
|
95 |
|
|
96 |
/** |
|
97 |
* Gets the inputepr param. |
|
98 |
* |
|
99 |
* @return the inputeprParam |
|
100 |
*/ |
|
101 |
public String getInputeprParam() { |
|
102 |
return inputeprParam; |
|
103 |
} |
|
104 |
|
|
105 |
/** |
|
106 |
* Gets the object store id. |
|
107 |
* |
|
108 |
* @return the objectStoreID |
|
109 |
*/ |
|
110 |
public String getObjectStoreID() { |
|
111 |
return objectStoreID; |
|
112 |
} |
|
113 |
|
|
114 |
/** |
|
115 |
* Sets the object store id. |
|
116 |
* |
|
117 |
* @param objectStoreID |
|
118 |
* the objectStoreID to set |
|
119 |
*/ |
|
120 |
public void setObjectStoreID(final String objectStoreID) { |
|
121 |
this.objectStoreID = objectStoreID; |
|
122 |
} |
|
123 |
|
|
124 |
/** |
|
125 |
* Gets the plugin. |
|
126 |
* |
|
127 |
* @return the plugin |
|
128 |
*/ |
|
129 |
public String getPlugin() { |
|
130 |
return plugin; |
|
131 |
} |
|
132 |
|
|
133 |
/** |
|
134 |
* Sets the plugin. |
|
135 |
* |
|
136 |
* @param plugin |
|
137 |
* the plugin to set |
|
138 |
*/ |
|
139 |
public void setPlugin(final String plugin) { |
|
140 |
this.plugin = plugin; |
|
141 |
} |
|
142 |
|
|
143 |
/** |
|
144 |
* Gets the protocol. |
|
145 |
* |
|
146 |
* @return the protol |
|
147 |
*/ |
|
148 |
public String getProtocol() { |
|
149 |
return protocol; |
|
150 |
} |
|
151 |
|
|
152 |
/** |
|
153 |
* Sets the protocol. |
|
154 |
* |
|
155 |
* @param protol |
|
156 |
* the protol to set |
|
157 |
*/ |
|
158 |
public void setProtocol(final String protol) { |
|
159 |
this.protocol = protol; |
|
160 |
} |
|
161 |
|
|
162 |
/** |
|
163 |
* Gets the mime type. |
|
164 |
* |
|
165 |
* @return the mimeType |
|
166 |
*/ |
|
167 |
public String getMimeType() { |
|
168 |
return mimeType; |
|
169 |
} |
|
170 |
|
|
171 |
/** |
|
172 |
* Sets the mime type. |
|
173 |
* |
|
174 |
* @param mimeType |
|
175 |
* the mimeType to set |
|
176 |
*/ |
|
177 |
public void setMimeType(final String mimeType) { |
|
178 |
this.mimeType = mimeType; |
|
179 |
} |
|
180 |
|
|
181 |
/** |
|
182 |
* Sets the inputepr param. |
|
183 |
* |
|
184 |
* @param inputeprParam |
|
185 |
* the inputeprParam to set |
|
186 |
*/ |
|
187 |
public void setInputeprParam(final String inputeprParam) { |
|
188 |
this.inputeprParam = inputeprParam; |
|
189 |
} |
|
190 |
|
|
191 |
/* |
|
192 |
* (non-Javadoc) |
|
193 |
* |
|
194 |
* @see eu.dnetlib.msro.worker.nodes.ProgressJobNode#getProgressProvider() |
|
195 |
*/ |
|
196 |
@Override |
|
197 |
public ProgressProvider getProgressProvider() { |
|
198 |
return progressProvider; |
|
199 |
} |
|
200 |
|
|
201 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/SimpleJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Engine; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.msro.worker.WorkflowConstants; |
|
10 |
|
|
11 |
public abstract class SimpleJobNode extends SarasvatiJobNode { |
|
12 |
|
|
13 |
private static final Log log = LogFactory.getLog(SarasvatiJobNode.class); |
|
14 |
|
|
15 |
@Override |
|
16 |
public final void execute(final Engine engine, final NodeToken token) { |
|
17 |
super.execute(engine, token); |
|
18 |
|
|
19 |
try { |
|
20 |
log.debug("START NODE: " + getBeanName()); |
|
21 |
beforeStart(token); |
|
22 |
String arc = execute(token); |
|
23 |
beforeCompleted(token); |
|
24 |
log.debug("END NODE (SUCCESS): " + getBeanName()); |
|
25 |
|
|
26 |
engine.complete(token, arc); |
|
27 |
} catch (Throwable e) { |
|
28 |
log.error("got exception while executing workflow node", e); |
|
29 |
log.debug("END NODE (FAILED): " + getBeanName()); |
|
30 |
beforeFailed(token); |
|
31 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true); |
|
32 |
token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, e.getMessage()); |
|
33 |
engine.complete(token, "failed"); |
|
34 |
} |
|
35 |
} |
|
36 |
|
|
37 |
abstract protected String execute(final NodeToken token) throws Exception; |
|
38 |
|
|
39 |
protected void beforeStart(final NodeToken token) { |
|
40 |
// For optional overwrites |
|
41 |
} |
|
42 |
|
|
43 |
protected void beforeCompleted(final NodeToken token) { |
|
44 |
// For optional overwrites |
|
45 |
} |
|
46 |
|
|
47 |
protected void beforeFailed(final NodeToken token) { |
|
48 |
// For optional overwrites |
|
49 |
} |
|
50 |
|
|
51 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/transform/GroovyJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.transform; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
import javax.xml.ws.wsaddressing.W3CEndpointReference; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.springframework.beans.factory.annotation.Required; |
|
12 |
|
|
13 |
import com.google.common.collect.Maps; |
|
14 |
import com.googlecode.sarasvati.Arc; |
|
15 |
import com.googlecode.sarasvati.NodeToken; |
|
16 |
|
|
17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
18 |
|
|
19 |
import eu.dnetlib.enabling.resultset.MappedResultSetFactory; |
|
20 |
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils; |
|
21 |
import eu.dnetlib.msro.rmi.ManagerServiceException; |
|
22 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
23 |
import groovy.lang.GroovyShell; |
|
24 |
import groovy.util.GroovyScriptEngine; |
|
25 |
|
|
26 |
public class GroovyJobNode extends SimpleJobNode { |
|
27 |
|
|
28 |
private static final Log log = LogFactory.getLog(GroovyJobNode.class); |
|
29 |
|
|
30 |
/** |
|
31 |
* used to transform the records using Groovy. |
|
32 |
*/ |
|
33 |
|
|
34 |
private MappedResultSetFactory mappedResultSetFactory; |
|
35 |
|
|
36 |
private String inputEprParam; |
|
37 |
private String outputEprParam; |
|
38 |
private String transformationRuleId; |
|
39 |
// private String groovyParams; |
|
40 |
|
|
41 |
@Resource |
|
42 |
private DnetServiceLocator serviceLocator; |
|
43 |
|
|
44 |
private Map<String, String> retrieveGroovyParameter() { |
|
45 |
Map<String, String> out = Maps.newHashMap(); |
|
46 |
|
|
47 |
String query = "for $x in collection('/db/DRIVER/GroovyProcessingDSResource/GroovyProcessingDSResourceType')" |
|
48 |
+ "where $x[.//RESOURCE_IDENTIFIER/@value='" + transformationRuleId + "']" |
|
49 |
+ "return concat($x//GROOVY_CLASSPATH/text(),':::',$x//GROOVY_DNETCLASS/text())"; |
|
50 |
try { |
|
51 |
String result = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(query).get(0); |
|
52 |
if (result == null) { return null; } |
|
53 |
String[] data = result.trim().split(":::"); |
|
54 |
if (data.length == 2) { |
|
55 |
out.put("classpath", data[0]); |
|
56 |
out.put("mainClass", data[1]); |
|
57 |
} |
|
58 |
|
|
59 |
return out; |
|
60 |
} catch (Exception e) { |
|
61 |
log.error(e); |
|
62 |
return null; |
|
63 |
} |
|
64 |
} |
|
65 |
|
|
66 |
@Override |
|
67 |
protected String execute(final NodeToken token) throws Exception { |
|
68 |
final String inputEprString = token.getEnv().getAttribute(inputEprParam); |
|
69 |
if (inputEprString == null || inputEprString.isEmpty()) { throw new ManagerServiceException("InputEprParam (" + inputEprParam + ") not found in ENV"); } |
|
70 |
final W3CEndpointReference inputEpr = new EPRUtils().getEpr(inputEprString); |
|
71 |
String groovyClasspath, groovyDnetClass; |
|
72 |
Map<String, String> prop = retrieveGroovyParameter(); |
|
73 |
groovyClasspath = prop.get("classpath"); |
|
74 |
groovyDnetClass = prop.get("mainClass"); |
|
75 |
W3CEndpointReference epr = transformGroovy(inputEpr, groovyClasspath, groovyDnetClass, parseJsonParameters(token)); |
|
76 |
token.getEnv().setAttribute(outputEprParam, epr.toString()); |
|
77 |
return Arc.DEFAULT_ARC; |
|
78 |
} |
|
79 |
|
|
80 |
private W3CEndpointReference transformGroovy(final W3CEndpointReference source, |
|
81 |
final String groovyClasspath, |
|
82 |
final String groovyDnetClass, |
|
83 |
final Map<String, String> params) throws ClassNotFoundException, IOException { |
|
84 |
|
|
85 |
GroovyScriptEngine gse = new GroovyScriptEngine(groovyClasspath); |
|
86 |
gse.getGroovyClassLoader().loadClass(groovyDnetClass); |
|
87 |
log.info("***********************************************"); |
|
88 |
log.info("Loaded Groovy classes:"); |
|
89 |
for (Class<?> c : gse.getGroovyClassLoader().getLoadedClasses()) { |
|
90 |
log.info(c.getCanonicalName()); |
|
91 |
} |
|
92 |
log.info("***********************************************"); |
|
93 |
GroovyShell groovyShell = new GroovyShell(gse.getGroovyClassLoader()); |
|
94 |
|
|
95 |
Object go = groovyShell.evaluate("new " + groovyDnetClass + "()"); |
|
96 |
if (go instanceof GroovyUnaryFunction) { |
|
97 |
GroovyUnaryFunction groovyUnaryFunction = (GroovyUnaryFunction) go; |
|
98 |
if (params != null) { |
|
99 |
groovyUnaryFunction.setParams(params); |
|
100 |
} |
|
101 |
return mappedResultSetFactory.createMappedResultSet(source, groovyUnaryFunction); |
|
102 |
} else { |
|
103 |
throw new RuntimeException("Groovy object " + go + " is not supported"); |
|
104 |
} |
|
105 |
} |
|
106 |
|
|
107 |
public MappedResultSetFactory getMappedResultSetFactory() { |
|
108 |
return mappedResultSetFactory; |
|
109 |
} |
|
110 |
|
|
111 |
@Required |
|
112 |
public void setMappedResultSetFactory(final MappedResultSetFactory mappedResultSetFactory) { |
|
113 |
this.mappedResultSetFactory = mappedResultSetFactory; |
|
114 |
} |
|
115 |
|
|
116 |
public String getInputEprParam() { |
|
117 |
return inputEprParam; |
|
118 |
} |
|
119 |
|
|
120 |
public void setInputEprParam(final String inputEprParam) { |
|
121 |
this.inputEprParam = inputEprParam; |
|
122 |
} |
|
123 |
|
|
124 |
public String getOutputEprParam() { |
|
125 |
return outputEprParam; |
|
126 |
} |
|
127 |
|
|
128 |
public void setOutputEprParam(final String outputEprParam) { |
|
129 |
this.outputEprParam = outputEprParam; |
|
130 |
} |
|
131 |
|
|
132 |
public String getTransformationRuleId() { |
|
133 |
return transformationRuleId; |
|
134 |
} |
|
135 |
|
|
136 |
public void setTransformationRuleId(final String transformationRuleId) { |
|
137 |
this.transformationRuleId = transformationRuleId; |
|
138 |
} |
|
139 |
|
|
140 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/transform/TransformJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.transform; |
|
2 |
|
|
3 |
import org.springframework.beans.factory.annotation.Autowired; |
|
4 |
import org.springframework.beans.factory.annotation.Required; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.data.resultSet.ResultSetFactory; |
|
10 |
import eu.dnetlib.data.resultSet.transform.TransformerFactory; |
|
11 |
import eu.dnetlib.enabling.datastructures.TransformationRule; |
|
12 |
import eu.dnetlib.enabling.is.client.InformationServiceClient; |
|
13 |
import eu.dnetlib.msro.worker.nodes.SimpleJobNode; |
|
14 |
import eu.dnetlib.rmi.objects.resultSet.ResultSet; |
|
15 |
import eu.dnetlib.rmi.soap.exceptions.ManagerServiceException; |
|
16 |
|
|
17 |
public class TransformJobNode extends SimpleJobNode { |
|
18 |
|
|
19 |
private String inputResultSetParam; |
|
20 |
private String outputResultSetParam; |
|
21 |
private String ruleId; |
|
22 |
|
|
23 |
@Autowired |
|
24 |
private InformationServiceClient isClient; |
|
25 |
|
|
26 |
@Autowired |
|
27 |
private ResultSetFactory resultSetFactory; |
|
28 |
|
|
29 |
private TransformerFactory transformerFactory; |
|
30 |
|
|
31 |
@Override |
|
32 |
protected String execute(final NodeToken token) throws Exception { |
|
33 |
@SuppressWarnings("unchecked") |
|
34 |
final ResultSet<String> inputRs = (ResultSet<String>) token.getEnv().getTransientAttribute(inputResultSetParam); |
|
35 |
if (inputRs == null) { throw new ManagerServiceException("InputEprParam (" + inputResultSetParam + ") not found in ENV"); } |
|
36 |
|
|
37 |
final TransformationRule rule = isClient.getResourceByCode(ruleId, TransformationRule.class); |
|
38 |
|
|
39 |
final ResultSet<String> outputRs = resultSetFactory.transform(inputRs, transformerFactory.createTransformer(rule)); |
|
40 |
|
|
41 |
token.getEnv().setAttribute(outputResultSetParam, outputRs.toString()); |
|
42 |
|
|
43 |
return Arc.DEFAULT_ARC; |
|
44 |
} |
|
45 |
|
|
46 |
public String getRuleId() { |
|
47 |
return ruleId; |
|
48 |
} |
|
49 |
|
|
50 |
public void setRuleId(final String ruleId) { |
|
51 |
this.ruleId = ruleId; |
|
52 |
} |
|
53 |
|
|
54 |
public String getInputResultSetParam() { |
|
55 |
return inputResultSetParam; |
|
56 |
} |
|
57 |
|
|
58 |
public void setInputResultSetParam(final String inputResultSetParam) { |
|
59 |
this.inputResultSetParam = inputResultSetParam; |
|
60 |
} |
|
61 |
|
|
62 |
public String getOutputResultSetParam() { |
|
63 |
return outputResultSetParam; |
|
64 |
} |
|
65 |
|
|
66 |
public void setOutputResultSetParam(final String outputResultSetParam) { |
|
67 |
this.outputResultSetParam = outputResultSetParam; |
|
68 |
} |
|
69 |
|
|
70 |
public TransformerFactory getTransformerFactory() { |
|
71 |
return transformerFactory; |
|
72 |
} |
|
73 |
|
|
74 |
@Required |
|
75 |
public void setTransformerFactory(final TransformerFactory transformerFactory) { |
|
76 |
this.transformerFactory = transformerFactory; |
|
77 |
} |
|
78 |
|
|
79 |
} |
modules/dnet-node-services/trunk/src/main/java/eu/dnetlib/msro/worker/nodes/mdstore/MultipleMdStoreIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.worker.nodes.mdstore; |
|
2 |
|
|
3 |
import java.util.Iterator; |
|
4 |
import java.util.List; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
import javax.xml.ws.wsaddressing.W3CEndpointReference; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.springframework.beans.factory.annotation.Autowired; |
|
12 |
|
|
13 |
import eu.dnetlib.data.mdstore.MDStoreService; |
|
14 |
import eu.dnetlib.data.mdstore.MDStoreServiceException; |
|
15 |
|
|
16 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
17 |
|
|
18 |
// TODO: Auto-generated Javadoc |
|
19 |
/** |
|
20 |
* The Class MultipleMdStoreIterator. |
|
21 |
*/ |
|
22 |
public class MultipleMdStoreIterator implements Iterator<String> { |
|
23 |
|
|
24 |
private static final Log log = LogFactory.getLog(MultipleMdStoreIterator.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
25 |
|
|
26 |
/** The service locator. */ |
|
27 |
@Resource |
|
28 |
private DnetServiceLocator serviceLocator; |
|
29 |
|
|
30 |
/** The md i ds. */ |
|
31 |
private List<String> mdIDs; |
|
32 |
|
|
33 |
/** The current id. */ |
|
34 |
private String currentId = null; |
|
35 |
|
|
36 |
/** The current iterator. */ |
|
37 |
private Iterator<String> currentIterator; |
|
38 |
|
|
39 |
/** The result set client factory. */ |
|
40 |
@Autowired |
|
41 |
private ResultSetClientFactory resultSetClientFactory; |
|
42 |
|
|
43 |
/** |
|
44 |
* Instantiates a new multiple md store iterator. |
|
45 |
* |
|
46 |
* @param mdstoreLocator |
|
47 |
* the mdstore locator |
|
48 |
* @param mdIds |
|
49 |
* the md ids |
|
50 |
*/ |
|
51 |
public MultipleMdStoreIterator(final DnetServiceLocator serviceLocator, final List<String> mdIds, final ResultSetClientFactory resultSetClientFactory) { |
|
52 |
this.serviceLocator = serviceLocator; |
|
53 |
this.mdIDs = mdIds; |
|
54 |
this.resultSetClientFactory = resultSetClientFactory; |
|
55 |
getNextMDStoreRecords(); |
|
56 |
|
|
57 |
} |
|
58 |
|
|
59 |
/* |
|
60 |
* (non-Javadoc) |
|
61 |
* |
|
62 |
* @see java.util.Iterator#hasNext() |
|
63 |
*/ |
|
64 |
@Override |
|
65 |
public boolean hasNext() { |
|
66 |
if (currentId == null || currentIterator == null) { return false; } |
|
67 |
return currentIterator.hasNext(); |
|
68 |
} |
|
69 |
|
|
70 |
/* |
|
71 |
* (non-Javadoc) |
|
72 |
* |
|
73 |
* @see java.util.Iterator#next() |
|
74 |
*/ |
|
75 |
@Override |
|
76 |
public String next() { |
|
77 |
String nextElement = currentIterator.next(); |
|
78 |
if (!currentIterator.hasNext()) { |
|
79 |
getNextMDStoreRecords(); |
|
80 |
} |
|
81 |
return nextElement; |
|
82 |
} |
|
83 |
|
|
84 |
/* |
|
85 |
* (non-Javadoc) |
|
86 |
* |
|
87 |
* @see java.util.Iterator#remove() |
|
88 |
*/ |
|
89 |
@Override |
|
90 |
public void remove() { |
|
91 |
currentIterator.remove(); |
|
92 |
} |
|
93 |
|
|
94 |
/** |
|
95 |
* Gets the next md store records. |
|
96 |
* |
|
97 |
* @return the next md store records |
|
98 |
*/ |
|
99 |
private void getNextMDStoreRecords() { |
|
100 |
if (mdIDs.size() > 0) { |
|
101 |
currentId = mdIDs.remove(0); |
|
102 |
currentIterator = getIterableResultset(currentId); |
|
103 |
} |
|
104 |
} |
|
105 |
|
|
106 |
/** |
|
107 |
* Gets the iterable resultset. |
|
108 |
* |
|
109 |
* @param id |
|
110 |
* the id |
|
111 |
* @return the iterable resultset |
|
112 |
*/ |
|
113 |
private Iterator<String> getIterableResultset(final String id) { |
|
114 |
try { |
|
115 |
W3CEndpointReference epr = serviceLocator.getService(MDStoreService.class, id).deliverMDRecords(id, "", "", ""); |
|
116 |
Iterable<String> input = resultSetClientFactory.getClient(epr); |
Also available in: Unified diff
Partial wf nodes reimplementation