Project

General

Profile

Actions

Creating a D-Net Workflow

Typologies of workflows:
  • collection workflows: operate the systematic collection of metadata or files from data sources;
  • processing workflows: implement the sequence of steps needed to stage collected metadata (or files) and form the target Information Spaces

Prerequisites

In order to implement workflow nodes you need to depend on the maven artifact dnet-msro-service providing the building blocks. Currently the latest release is [3.0.0], and this module is work in progress.

<dependency>
    <groupId>eu.dnetlib</groupId>
    <artifactId>dnet-msro-service</artifactId>
    <version>[3.0.0,4.0.0)</version>
</dependency>

Definition of workflow

The definition of a processing workflow basically consists in the creation of (at least) 2 D-Net profiles:
  1. One (or more) workflow (WorkflowDSResourceType) : it defines the sequence of steps to execute (in sequence or in parallel).
  2. One meta-workflow (MetaWorkflowDSResourceType): it describes a set of related workflows that can run in parallel or in sequence to accomplish a high-level functionality.
    • For example, in order to export the Information Space via OAI-PMH we have first to feed the OAI-PMH back-end (workflow 1: "OAI store feed"), then we can configure the back-end with the needed indexes (workflow 2: "OAI Post feed").

Workflow definition

Let's see a simple example of a workflow with only one step that performs a sleep:

<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <HEADER>
        <RESOURCE_IDENTIFIER value="wfUUID_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
        <RESOURCE_KIND value="WorkflowDSResources"/>
        <RESOURCE_URI value=""/>
        <DATE_OF_CREATION value="2015-02-06T11:13:51.0Z"/>
    </HEADER>
    <BODY>
        <WORKFLOW_NAME>Sleeping Beauty</WORKFLOW_NAME>
        <WORKFLOW_TYPE>Tutorial</WORKFLOW_TYPE>
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
        <CONFIGURATION start="manual">
            <NODE name="sleep" isStart="true" type="Sleep" >
                <DESCRIPTION>Sleep for a while</DESCRIPTION>
                <PARAMETERS>
                    <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
                </PARAMETERS>
                <ARCS>
                    <ARC to="success"/>
                </ARCS>
            </NODE>
        </CONFIGURATION>
        <STATUS/>
    </BODY>
</RESOURCE_PROFILE>

The CONFIGURATION element of the profile tells us that the workflow is composed of one step.
The step is defined by the NODE element:
  • name="sleep": the name that will appear in the UI when showing the workflow
  • isStart="true": marks the node as starting node. Each workflow must have at least one start node
  • type="Sleep": the type of the node. Through this value we refer to a Java bean with name wfNodeSleep, whose class implements the business logic. Therefore we expect a bean like the following to be defined in an application-context XML:
    <bean id="wfNodeSleep" class="eu.dnetlib.msro.tutorial.SleepJobNode" scope="prototype"/>
    

    Note that the attribute scope="prototype" is important because it tells the Spring framework to create a new bean instance of the object every time a request for that specific bean is made.
    We'll see how the actual class SleepJobNode in a while.
  • DESCRIPTION: brief description of the logics of the node. It will appear in the UI when showing the workflow
  • PARAMETERS: parameters needed by the node. A parameter (PARAM), has the follwoing attributes:
    • name and type: name and type of the param. The Java class must define a property (accessible via public getter/setter) with the same name and type.
    • required: tells if the param is required or not. If it is, then the workflow can be run only if the param is set.
    • managedBy: who's in charge of setting the value. Allowed values are:
      • user: the UI will enable user to set the value
      • system: the UI will not enable the user to set the value, as the system is expected to set the parameter automatically (more on this in a while)
      • More attributes are available. TODO: add other optional attributes for parameter (e.g. function)
  • ARCS: which node must be executed after this node. In the example, we simply go to the success node, a pre-defined node that completes the workflow successfully. If there is an exception in one node, the pre-defined failure node is executed automatically.

JobNode types

The D-Net workflow engine is based on Sarasvati and offers three JobNode super classes to choose from, depending on the specific workflow requirements. Every jobNode extends the Sarasvati class

com.googlecode.sarasvati.mem.MemNode

and defines the following abstract method to override

abstract protected String execute(final NodeToken token) throws Exception;

that must return the name of the arc to follow.
Through this mechanism you can guide the workflow to execute a path or another depending on what's happening during the execution of the current step.

The D-Net workflow engine offers three JobNode super classes to choose from, depending on the specific workflow requirements:

eu.dnetlib.msro.workflows.nodes.SimpleJobNode

Basic workflow node implementation. The execution is synchronous, and it runs the execute method in the main thread. The use of this kind of job nodes is suited for short lasting synchronous service calls, e.g. isLookup, mapped resultsets. This node is synchronous with sarasvati, which won't be able to execute in parallel more than one instance of this nodes at the same time. To overcome this limitation, use the following node type.

eu.dnetlib.msro.workflows.nodes.AsyncJobNode

The AsyncJob nodes delegates the execution of the execute method in a separated thread. This implies that the jobNode is decoupled from sarasvati, allowing parallel execution of different AsyncJobNodes at the same time. The workflow doesn't advance until the delegated thread running the execute method doesn't complete.

eu.dnetlib.msro.workflows.nodes.BlackboardJobNode

The BlackboardJobNode allows to send blackboard messages to D-Net services implementing the blackboard protocol, and differently from the previous two, exposes a different signature:

abstract protected String obtainServiceId(NodeToken token);

abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;

JobNode examples

Now let's see how the business logic of the node is implemented in the Java class eu.dnetlib.msro.tutorial.SleepJobNode:

Let's suppose we want to:
  • sleep and go to success if sleepTimeMs < 2000
  • sleep and go to failure if sleepTimeMs >= 2000
public class SleepJobNode extends SimpleJobNode{
   private int sleepTimeMs;

   @Override
   protected String execute(final NodeToken token) throws Exception {
      //removed exception handling for readability
      Thread.sleep(sleepTimeMs);
      if(sleepTimeMs < 2000) 
         return "ok";
      else 
         return "oops";
   }

   public int getSleepTimeMs(){ return sleepTimeMs;}
   public void setSleepTimeMs(int time){ sleepTimeMs = time;}
}

and the workflow should be adapted to consider the two different arcs: "ok" and "oops"

<NODE name="sleep" isStart="true" type="Sleep" >
   <DESCRIPTION>Sleep for a while</DESCRIPTION>
   <PARAMETERS>
      <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
   </PARAMETERS>
   <ARCS>
      <ARC name="ok" to="success"/>
      <ARC name="oops" to="failure"/>
   </ARCS>
</NODE>

Extending BlackboardJobNode

D-Net services can implement the BlackBoard (BB) protocol, which allows them to receive messages.
If a message is known to a service, the service will execute the corresponding action and notify the caller about its execution status by updating the blackboard in the service profile. The management of reading/writing BB messages is implemented in the BlackboardJobNode class, so that you only have to:
  • Specify the profile id of the service you want to call by overriding the method
    abstract protected String obtainServiceId(NodeToken token);
  • Prepare the BB message by overriding the method
    abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;

Let's suppose we have a service accepting the BB message "SLEEP" that requires a parameter named sleepTimeMs.
Our SleepJobNode class becomes:

public class SleepJobNode extends BlackboardJobNode{
   private String xqueryForServiceId;

   private int sleepTimeMs;

   @Override
   protected String obtainServiceId(final NodeToken token) {
       return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xqueryForServiceId);
   }   

   @Override
   protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
      job.setAction("SLEEP");
      job.getParameters().put("sleepTimeMs", sleepTimeMs);
   }

   public int getSleepTimeMs(){ return sleepTimeMs;}
   public void setSleepTimeMs(int time){ sleepTimeMs = time;}
   public String getXqueryForServiceId() { return xqueryForServiceId;}
   public void setXqueryForServiceId(final String xqueryForServiceId) { this.xqueryForServiceId = xqueryForServiceId;}
}

And the workflow node must declare a new parameter for the xqueryForServiceId:

<NODE name="sleep" isStart="true" type="Sleep">
    <DESCRIPTION>Sleep for a while</DESCRIPTION>
    <PARAMETERS>
        <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
        <PARAM name="xqueryForServiceId" managedBy="user" type="string" required="true">/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='SleepServiceResourceType']/HEADER/RESOURCE_IDENTIFIER/@value/string()</PARAM>
    </PARAMETERS>
    <ARCS>
        <ARC to="success"/>
    </ARCS>
</NODE>

But what if one of your params shuld not be set by an end-user but it should rather be picked from the workflow env?
Let's suppose this is the case for the xquery. Then the SleepJobNode class should expect to receive the name of the env parameter where to look for the actual xquery to use:

public class SleepJobNode extends BlackboardJobNode{
   private String xqueryForServiceIdParam;
   . . . 
   @Override
   protected String obtainServiceId(final NodeToken token) {
       final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam());
       return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
   }   
. . . 

<NODE name="sleep" isStart="true" type="Sleep">
    <DESCRIPTION>Sleep for a while</DESCRIPTION>
    <PARAMETERS>
        <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
        <PARAM managedBy="system" name="xqueryForServiceIdParam" required="true" type="string">xqueryForServiceId_in_env</PARAM>
    </PARAMETERS>
    <ARCS>
        <ARC to="success"/>
    </ARCS>
</NODE>

Special parameters

Note that in case of parameter name clashes, the order is sysparams < envparams < params, thus XML node params override params in the env, which in turns override sys params.

envParams

envParams is a special parameter that allows you to map one value of the wf env to another parameter of the same wf env, allowing to wave parameters from node to node.
This can be useful when different nodes of the same workflow want to use the same value but they are coded to read it from different env parameters.
Example:

<PARAM required="true" type="string" name="envParams" managedBy="system">
  { 
     'path' : 'rottenRecordsPath'
  }
</PARAM>

The value of envParams is a list of parameter mappings. In the example we have 1 mapping: the value in the env parameter named "rottenRecordsPath" is copied into another env parameter named "path".
Given the above configuration, a node will be able to find the value it needs in the expected env parameter ("path").

sysParams

sysParams is a special parameter that allows you to set a parameter with a value coming from a system property (those you set in *.properties files).
Example:

<PARAM required="true" type="string" name="sysParams" managedBy="system">
{     
   'hbase.mapred.inputtable' : 'hbase.mapred.datatable', 
   'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable'
}
</PARAM>

The value of sysParams is a list of parameter mappings. In the example we have 2 mapping: the value of the system property "hbase.mapred.datatable" will be copied into two new parameters: "hbase.mapred.inputtable" and "hbase.mapreduce.inputtable".

Re-using existing job node beans

Often you will need generic nodes in your workflow that have already been implemented and used.
before you implement a new node for a generic feature, it is better to check here on where is my bean and filter by "wfNode".

where is my bean will tell you which module and which application context defines the bean you need.
Consider that the service looks into the whole dnet40/modules svn directory, hence you will usually find several entries for the same bean.

Typically, you'll need beans located in dnet-msro-service and dnet-*-workflows (e.g. dnet-opnaireplus-workflows for the OpenAIRE project).

CNR suggests you to check out those modules from svn so you can quickly refer to node implementations, if you need it.

Meta-Workflow definition

A meta-workflow implements a high-level functionality by composing workflows.
In the simplest scenario, one meta-workflow refers to only one workflow, as follows:

<RESOURCE_PROFILE>
    <HEADER>
        <RESOURCE_IDENTIFIER
            value="metaWF-UUID_TWV0YVdvcmtmbG93RFNSZXNvdXJjZXMvTWV0YVdvcmtmbG93RFNSZXNvdXJjZVR5cGU="/>
        <RESOURCE_TYPE value="MetaWorkflowDSResourceType"/>
        <RESOURCE_KIND value="MetaWorkflowDSResources"/>
        <RESOURCE_URI value=""/>
        <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/>
    </HEADER>
    <BODY>
        <METAWORKFLOW_NAME family="tutorial">Sleeping</METAWORKFLOW_NAME>
        <METAWORKFLOW_DESCRIPTION>A sample meta-wf</METAWORKFLOW_DESCRIPTION>
        <METAWORKFLOW_SECTION>Tutorial</METAWORKFLOW_SECTION>
        <ADMIN_EMAIL>wf-admin-1@wf.com,wf-admin-2@wf.com</ADMIN_EMAIL>
        <CONFIGURATION status="EXECUTABLE">
            <WORKFLOW
                id="wf-profileID" name="Sleeping Beauty"/>
        </CONFIGURATION>
        <SCHEDULING enabled="false">
            <CRON>29 5 22 ? * *</CRON>
            <MININTERVAL>10080</MININTERVAL>
        </SCHEDULING>
    </BODY>
</RESOURCE_PROFILE>
Let's have a look at the information in the BODY element:
  • METAWORKFLOW_NAME, family, METAWORKFLOW_DESCRIPTION, and METAWORKFLOW_SECTION, : guide the UI for a correct visualization
  • ADMIN_EMAIL: comma separated list of emails. Put here the emails of people that want to receive emails about the failure and completion of the workflows included in this meta-workflow.
  • WORKFLOW: reference to an existing workflow
  • SCHEDULING:
    • enabled: set to true if you want the meta-workflow to run automatically according to the given schedule
    • CRON: cron expression to schedule the meta-workflow execution
    • MININTERVAL: milliseconds (?) between two subsequent runs of the meta-workflow.

More complex meta-workflows may include different workflows running in sequence, in parallel, or a mix of them.
The data provision meta-workflow of OpenAIRE is a good example of a complex meta-workflow. It composes a total of 10 different workflows, where workflows profileID2, profileID3, profileID4, profileID5 runs in parallel. Note that the parallel/sequence composition is defined by the level of nesting of the WORKFLOW XML element.

<CONFIGURATION status="EXECUTABLE">
    <WORKFLOW id="profileID1" name="reset">
        <WORKFLOW id="profileID2" name="db2hbase"/>
        <WORKFLOW id="profileID3" name="oaf2hbase"/>
        <WORKFLOW id="profileID4" name="odf2hbase"/>
        <WORKFLOW id="profileID5" name="actions2hbase">
            <WORKFLOW id="profileID6" name="deduplication organizations">
                <WORKFLOW id="profileID7" name="deduplication persons">
                    <WORKFLOW id="profileID8" name="deduplication results">
                        <WORKFLOW id="profileID9" name="update provision">
                            <WORKFLOW id="profileID10" name="prepare pre-public info space"/>
                        </WORKFLOW>
                    </WORKFLOW>
                </WORKFLOW>
            </WORKFLOW>
        </WORKFLOW>
    </WORKFLOW>
</CONFIGURATION>

Workflow use case: metadata aggregation

COMING SOON

Updated by Claudio Atzori about 9 years ago · 16 revisions