Creating a D-Net Workflow » History » Revision 1
Revision 1/16
| Next »
Alessia Bardi, 06/02/2015 05:06 PM
Creating a D-Net Workflow¶
Typologies of workflows:- collection workflows: operate the systematic collection of metadata or files from data sources;
- processing workflows: implement the sequence of steps needed to stage collected metadata (or files) and form the target Information Spaces
Defining a collection workflow¶
COMING SOON
Definition of a processing workflow¶
The definition of a processing workflow basically consists in the creation of (at least) 2 D-Net profiles:- 1 meta_workflow (
MetaWorkflowDSResourceType
): it describes a set of related workflows that can run in parallel or in sequence to accomplish a high-level functionality.- For example, in order to export the Information Space via OAI-PMH we have first to feed the OAI-PMH back-end (workflow 1: "OAI store feed"), then we can configure the back-end with the needed indexes (workflow 2: "OAI Post feed").
- 1 (or more) workflow (
WorkflowDSResourceType
): it defines the sequence of steps to execute (in sequence or in parallel).
Workflow definition¶
Let's see a simple example of a workflow with only one step that performs a sleep:
<?xml version="1.0" encoding="UTF-8"?> <RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <HEADER> <RESOURCE_IDENTIFIER value="wfUUID_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/> <RESOURCE_TYPE value="WorkflowDSResourceType"/> <RESOURCE_KIND value="WorkflowDSResources"/> <RESOURCE_URI value=""/> <DATE_OF_CREATION value="2015-02-06T11:13:51.0Z"/> </HEADER> <BODY> <WORKFLOW_NAME>Sleeping Beauty</WORKFLOW_NAME> <WORKFLOW_TYPE>Tutorial</WORKFLOW_TYPE> <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> <CONFIGURATION start="manual"> <NODE name="sleep" isStart="true" type="Sleep" > <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> </PARAMETERS> <ARCS> <ARC to="success"/> </ARCS> </NODE> </CONFIGURATION> <STATUS/> </BODY> </RESOURCE_PROFILE>The
CONFIGURATION
element of the profile tells us that the workflow is composed of one step.The step is defined by the
NODE
element:
name="sleep"
: the name that will appear in the UI when showing the workflowisStart="true"
: marks the node as starting node. Each workflow must have at least one start nodetype="Sleep"
: the type of the node. Through this value we refer to a Java bean with namewfNodeSleep
, whose class implements the business logic. Therefore we expect a bean like the following to be defined in an application-context XML:<bean id="wfNodeSleep" class="eu.dnetlib.msro.tutorial.SleepJobNode" scope="prototype"/>
Note that the attributescope="prototype"
is important because it tells the Spring framework to create a new bean instance of the object every time a request for that specific bean is made.
We'll see how the actual classSleepJobNode
in a while.DESCRIPTION
: brief description of the logics of the node. It will appear in the UI when showing the workflowPARAMETERS
: parameters needed by the node. A parameter (PARAM
), has the follwoing attributes:name
andtype
: name and type of the param. The Java class must define a property (accessible via public getter/setter) with the same name and type.required
: tells if the param is required or not. If it is, then the workflow can be run only if the param is set.managedBy
: who's in charge of setting the value. Allowed values are:user
: the UI will enable user to set the valuesystem
: the UI will not enable the user to set the value, as the system is expected to set the parameter automatically (more on this in a while)- More attributes are available. TODO: add other optional attributes for parameter (e.g.
function
)
ARCS
: which node must be executed after this node. In the example, we simply go to thesuccess
node, a pre-defined node that completes the workflow successfully. If there is an exception in one node, the pre-definedfailure
node is executed automatically.
Now let's see how the business logic of the node is implemented in the Java class eu.dnetlib.msro.tutorial.SleepJobNode
:
public class SleepJobNode extends SimpleJobNode{ private int sleepTimeMs; @Override protected String execute(final NodeToken token) throws Exception { //removed exception handling for readability Thread.sleep(sleepTimeMs); return Arc.DEFAULT_ARC; } public int getSleepTimeMs(){ return sleepTimeMs;} public void setSleepTimeMs(int time){ sleepTimeMs = time;} }
SleepJobNode
extends the class eu.dnetlib.msro.workflows.nodes.SimpleJobNode
, which is a D-Net class extending Sarasvati com.googlecode.sarasvati.mem.MemNode
. The method to override isabstract protected String execute(final NodeToken token) throws Exception;
that must return the name of the arc to follow.
Through this mechanism you can guide the workflow to execute a path or another depending on what's happening during the execution of the current step.
- sleep and go to success if sleepTimeMs < 2000
- sleep and go to failure if sleepTimeMs >= 2000
Our class should change to something like:
public class SleepJobNode extends SimpleJobNode{ private int sleepTimeMs; @Override protected String execute(final NodeToken token) throws Exception { //removed exception handling for readability Thread.sleep(sleepTimeMs); if(sleepTimeMs < 2000) return "ok"; else return "oops"; } public int getSleepTimeMs(){ return sleepTimeMs;} public void setSleepTimeMs(int time){ sleepTimeMs = time;} }
and the workflow should be adapted to consider the two different arcs: "ok" and "oops"
<NODE name="sleep" isStart="true" type="Sleep" > <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> </PARAMETERS> <ARCS> <ARC name="ok" to="success"/> <ARC name="oops" to="failure"/> </ARCS> </NODE>D-Net offers three JobNode super classes to choose from:
eu.dnetlib.msro.workflows.nodes.SimpleJobNode
: sync execution. It runs theexecute
method in the main thread.eu.dnetlib.msro.workflows.nodes.AsyncJobNode
: async execution. It runs theexecute
method in a separate thread.eu.dnetlib.msro.workflows.nodes.BlackboardJobNode
: use this class when your node must perform a blackboard request to a D-Net service
Extending BlackboardJobNode¶
D-Net services can implement the BlackBoard (BB) protocol that allows them to receive messages.If a message is known to a service, the service will execute the corresponding action and notify the caller about its execution status by updating the blackboard in the service profile. The management of reading/writing BB messages is implemented in the
BlackboardJobNode
class, so that you only have to:
- Specify the profile id of the service you want to call by overriding the method
abstract protected String obtainServiceId(NodeToken token);
- Prepare the BB message by overriding the method
abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
Let's suppose we have a service accepting the BB message "SLEEP" that requires a parameter named sleepTimeMs
.
Our SleepJobNode
class becomes:
public class SleepJobNode extends BlackboardJobNode{ private String xqueryForServiceId; private int sleepTimeMs; @Override protected String obtainServiceId(final NodeToken token) { List<String> serviceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xqueryForServiceId); if (serviceIds.size() < 1) throw new RuntimeException("Service id not found using query: " + xqueryForServiceId); return serviceIds.get(0); } @Override protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { job.setAction("SLEEP"); job.getParameters().put("sleepTimeMs", sleepTimeMs); } public int getSleepTimeMs(){ return sleepTimeMs;} public void setSleepTimeMs(int time){ sleepTimeMs = time;} public String getXqueryForServiceId() { return xqueryForServiceId;} public void setXqueryForServiceId(final String xqueryForServiceId) { this.xqueryForServiceId = xqueryForServiceId;} }
And the workflow node must declare a new parameter for the xqueryForServiceId
:
<NODE name="sleep" isStart="true" type="Sleep"> <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> <PARAM name="xqueryForServiceId" managedBy="user" type="string" required="true">/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='SleepServiceResourceType']/HEADER/RESOURCE_IDENTIFIER/@value/string()</PARAM> </PARAMETERS> <ARCS> <ARC to="success"/> </ARCS> </NODE>
But what if one of your params shuld not be set by an end-user but it should rather be picked from the workflow env?
Let's suppose this is the case for the xquery. Then the SleepJobNode class should expect to receive the name of the env parameter where to look for the actual xquery to use:
public class SleepJobNode extends BlackboardJobNode{ private String xqueryForServiceIdParam; . . . @Override protected String obtainServiceId(final NodeToken token) { final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); List<String> serviceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xquery); . . . } . . .
<NODE name="sleep" isStart="true" type="Sleep"> <DESCRIPTION>Sleep for a while</DESCRIPTION> <PARAMETERS> <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> <PARAM managedBy="system" name="xqueryForServiceIdParam" required="true" type="string">xqueryForServiceId_in_env</PARAM> </PARAMETERS> <ARCS> <ARC to="success"/> </ARCS> </NODE>
Special parameters¶
Note that in case of parameter name clashes, the order is sysparams < envparams < params
, thus XML node params override params in the env, which in turns override sys params.
envParams¶
envParams
is a special parameter that allows you to map one value of the wf env to another parameter of the same wf env.This can be useful when different nodes of the same workflow want to use the same value but they are coded to read it from different env parameters.
Example:
<PARAM required="true" type="string" name="envParams" managedBy="system"> { 'path' : 'rottenRecordsPath' } </PARAM>
The value of
envParams
is a list of parameter mappings. In the example we have 1 mapping:
- Reads the env parameter named "rottenRecordsPath" and copy its value into another env parameter named "path"
Given the above configuration, a node will be able to find the value it needs in the expected env parameter ("path").
sysParams¶
sysParam
is a special parameter that allows you to set a parameter with a value coming from a system property (those you set in *.properties files).
Example:
<PARAM required="true" type="string" name="sysParams" managedBy="system"> { 'hbase.mapred.inputtable' : 'hbase.mapred.datatable', 'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable' } </PARAM>
The value of
sysParams
is a list of parameter mappings. In the example we have 2 mapping: the value of the system property "hbase.mapred.datatable" will be copied into two new parameters: "hbase.mapred.inputtable" and "hbase.mapreduce.inputtable".
Updated by Alessia Bardi almost 10 years ago · 1 revisions