Project

General

Profile

Actions

Creating a D-Net Workflow » History » Revision 9

« Previous | Revision 9/16 (diff) | Next »
Alessia Bardi, 06/02/2015 06:02 PM
created section for meta-wf definition


Creating a D-Net Workflow

Typologies of workflows:
  • collection workflows: operate the systematic collection of metadata or files from data sources;
  • processing workflows: implement the sequence of steps needed to stage collected metadata (or files) and form the target Information Spaces

Defining a collection workflow

COMING SOON

Definition of a processing workflow

The definition of a processing workflow basically consists in the creation of (at least) 2 D-Net profiles:
  1. One (or more) workflow (WorkflowDSResourceType) : it defines the sequence of steps to execute (in sequence or in parallel).
  2. One meta-workflow (MetaWorkflowDSResourceType): it describes a set of related workflows that can run in parallel or in sequence to accomplish a high-level functionality.
    • For example, in order to export the Information Space via OAI-PMH we have first to feed the OAI-PMH back-end (workflow 1: "OAI store feed"), then we can configure the back-end with the needed indexes (workflow 2: "OAI Post feed").

Workflow definition

Let's see a simple example of a workflow with only one step that performs a sleep:

<?xml version="1.0" encoding="UTF-8"?>
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <HEADER>
        <RESOURCE_IDENTIFIER value="wfUUID_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
        <RESOURCE_KIND value="WorkflowDSResources"/>
        <RESOURCE_URI value=""/>
        <DATE_OF_CREATION value="2015-02-06T11:13:51.0Z"/>
    </HEADER>
    <BODY>
        <WORKFLOW_NAME>Sleeping Beauty</WORKFLOW_NAME>
        <WORKFLOW_TYPE>Tutorial</WORKFLOW_TYPE>
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
        <CONFIGURATION start="manual">
            <NODE name="sleep" isStart="true" type="Sleep" >
                <DESCRIPTION>Sleep for a while</DESCRIPTION>
                <PARAMETERS>
                    <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
                </PARAMETERS>
                <ARCS>
                    <ARC to="success"/>
                </ARCS>
            </NODE>
        </CONFIGURATION>
        <STATUS/>
    </BODY>
</RESOURCE_PROFILE>

The CONFIGURATION element of the profile tells us that the workflow is composed of one step.
The step is defined by the NODE element:
  • name="sleep": the name that will appear in the UI when showing the workflow
  • isStart="true": marks the node as starting node. Each workflow must have at least one start node
  • type="Sleep": the type of the node. Through this value we refer to a Java bean with name wfNodeSleep, whose class implements the business logic. Therefore we expect a bean like the following to be defined in an application-context XML:
    <bean id="wfNodeSleep" class="eu.dnetlib.msro.tutorial.SleepJobNode" scope="prototype"/>
    

    Note that the attribute scope="prototype" is important because it tells the Spring framework to create a new bean instance of the object every time a request for that specific bean is made.
    We'll see how the actual class SleepJobNode in a while.
  • DESCRIPTION: brief description of the logics of the node. It will appear in the UI when showing the workflow
  • PARAMETERS: parameters needed by the node. A parameter (PARAM), has the follwoing attributes:
    • name and type: name and type of the param. The Java class must define a property (accessible via public getter/setter) with the same name and type.
    • required: tells if the param is required or not. If it is, then the workflow can be run only if the param is set.
    • managedBy: who's in charge of setting the value. Allowed values are:
      • user: the UI will enable user to set the value
      • system: the UI will not enable the user to set the value, as the system is expected to set the parameter automatically (more on this in a while)
      • More attributes are available. TODO: add other optional attributes for parameter (e.g. function)
  • ARCS: which node must be executed after this node. In the example, we simply go to the success node, a pre-defined node that completes the workflow successfully. If there is an exception in one node, the pre-defined failure node is executed automatically.

Now let's see how the business logic of the node is implemented in the Java class eu.dnetlib.msro.tutorial.SleepJobNode:

public class SleepJobNode extends SimpleJobNode{
   private int sleepTimeMs;

   @Override
   protected String execute(final NodeToken token) throws Exception {
      //removed exception handling for readability
      Thread.sleep(sleepTimeMs);
      return Arc.DEFAULT_ARC;
   }

   public int getSleepTimeMs(){ return sleepTimeMs;}
   public void setSleepTimeMs(int time){ sleepTimeMs = time;}
}

SleepJobNode extends the class

eu.dnetlib.msro.workflows.nodes.SimpleJobNode
, which is a D-Net class extending Sarasvati
com.googlecode.sarasvati.mem.MemNode
. The method to override is
abstract protected String execute(final NodeToken token) throws Exception;

that must return the name of the arc to follow.
Through this mechanism you can guide the workflow to execute a path or another depending on what's happening during the execution of the current step.

For example, let's suppose we want to:
  • sleep and go to success if sleepTimeMs < 2000
  • sleep and go to failure if sleepTimeMs >= 2000

Our class should change to something like:

public class SleepJobNode extends SimpleJobNode{
   private int sleepTimeMs;

   @Override
   protected String execute(final NodeToken token) throws Exception {
      //removed exception handling for readability
      Thread.sleep(sleepTimeMs);
      if(sleepTimeMs < 2000) return "ok";
      else return "oops";
   }

   public int getSleepTimeMs(){ return sleepTimeMs;}
   public void setSleepTimeMs(int time){ sleepTimeMs = time;}
}

and the workflow should be adapted to consider the two different arcs: "ok" and "oops"

<NODE name="sleep" isStart="true" type="Sleep" >
   <DESCRIPTION>Sleep for a while</DESCRIPTION>
   <PARAMETERS>
      <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
   </PARAMETERS>
   <ARCS>
      <ARC name="ok" to="success"/>
      <ARC name="oops" to="failure"/>
   </ARCS>
</NODE>

D-Net offers three JobNode super classes to choose from:
  • eu.dnetlib.msro.workflows.nodes.SimpleJobNode
    : sync execution. It runs the execute method in the main thread.
  • eu.dnetlib.msro.workflows.nodes.AsyncJobNode
    : async execution. It runs the execute method in a separate thread.
  • eu.dnetlib.msro.workflows.nodes.BlackboardJobNode
    : use this class when your node must perform a blackboard request to a D-Net service

Extending BlackboardJobNode

D-Net services can implement the BlackBoard (BB) protocol, which allows them to receive messages.
If a message is known to a service, the service will execute the corresponding action and notify the caller about its execution status by updating the blackboard in the service profile. The management of reading/writing BB messages is implemented in the BlackboardJobNode class, so that you only have to:
  • Specify the profile id of the service you want to call by overriding the method
    abstract protected String obtainServiceId(NodeToken token);
  • Prepare the BB message by overriding the method
    abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;

Let's suppose we have a service accepting the BB message "SLEEP" that requires a parameter named sleepTimeMs.
Our SleepJobNode class becomes:

public class SleepJobNode extends BlackboardJobNode{
   private String xqueryForServiceId;

   private int sleepTimeMs;

   @Override
   protected String obtainServiceId(final NodeToken token) {
       List<String> serviceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xqueryForServiceId);
       if (serviceIds.size() < 1) throw new RuntimeException("Service id not found using query: " + xqueryForServiceId);
       return serviceIds.get(0);
   }   

   @Override
   protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
      job.setAction("SLEEP");
      job.getParameters().put("sleepTimeMs", sleepTimeMs);
   }

   public int getSleepTimeMs(){ return sleepTimeMs;}
   public void setSleepTimeMs(int time){ sleepTimeMs = time;}
   public String getXqueryForServiceId() { return xqueryForServiceId;}
   public void setXqueryForServiceId(final String xqueryForServiceId) { this.xqueryForServiceId = xqueryForServiceId;}
}

And the workflow node must declare a new parameter for the xqueryForServiceId:

<NODE name="sleep" isStart="true" type="Sleep">
    <DESCRIPTION>Sleep for a while</DESCRIPTION>
    <PARAMETERS>
        <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
        <PARAM name="xqueryForServiceId" managedBy="user" type="string" required="true">/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='SleepServiceResourceType']/HEADER/RESOURCE_IDENTIFIER/@value/string()</PARAM>
    </PARAMETERS>
    <ARCS>
        <ARC to="success"/>
    </ARCS>
</NODE>

But what if one of your params shuld not be set by an end-user but it should rather be picked from the workflow env?
Let's suppose this is the case for the xquery. Then the SleepJobNode class should expect to receive the name of the env parameter where to look for the actual xquery to use:

public class SleepJobNode extends BlackboardJobNode{
   private String xqueryForServiceIdParam;
   . . . 
   @Override
   protected String obtainServiceId(final NodeToken token) {
       final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam());
       List<String> serviceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xquery);
       . . .
   }   
. . . 

<NODE name="sleep" isStart="true" type="Sleep">
    <DESCRIPTION>Sleep for a while</DESCRIPTION>
    <PARAMETERS>
        <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM>
        <PARAM managedBy="system" name="xqueryForServiceIdParam" required="true" type="string">xqueryForServiceId_in_env</PARAM>
    </PARAMETERS>
    <ARCS>
        <ARC to="success"/>
    </ARCS>
</NODE>

Special parameters

Note that in case of parameter name clashes, the order is sysparams < envparams < params, thus XML node params override params in the env, which in turns override sys params.

envParams

envParams is a special parameter that allows you to map one value of the wf env to another parameter of the same wf env, allowing to wave parameters from node to node.
This can be useful when different nodes of the same workflow want to use the same value but they are coded to read it from different env parameters.
Example:

<PARAM required="true" type="string" name="envParams" managedBy="system">
  { 
     'path' : 'rottenRecordsPath'
  }
</PARAM>

The value of envParams is a list of parameter mappings. In the example we have 1 mapping: the value in the env parameter named "rottenRecordsPath" is copied into another env parameter named "path".
Given the above configuration, a node will be able to find the value it needs in the expected env parameter ("path").

sysParams

sysParams is a special parameter that allows you to set a parameter with a value coming from a system property (those you set in *.properties files).
Example:

<PARAM required="true" type="string" name="sysParams" managedBy="system">
{     
   'hbase.mapred.inputtable' : 'hbase.mapred.datatable', 
   'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable'
}
</PARAM>

The value of sysParams is a list of parameter mappings. In the example we have 2 mapping: the value of the system property "hbase.mapred.datatable" will be copied into two new parameters: "hbase.mapred.inputtable" and "hbase.mapreduce.inputtable".

Meta-Workflow definition

A meta-workflow implements a high-level functionality by composing workflows.
In the simplest scenario, one meta-workflow refers to only one workflow, as follows:

<RESOURCE_PROFILE>
    <HEADER>
        <RESOURCE_IDENTIFIER
            value="metaWF-UUID_TWV0YVdvcmtmbG93RFNSZXNvdXJjZXMvTWV0YVdvcmtmbG93RFNSZXNvdXJjZVR5cGU="/>
        <RESOURCE_TYPE value="MetaWorkflowDSResourceType"/>
        <RESOURCE_KIND value="MetaWorkflowDSResources"/>
        <RESOURCE_URI value=""/>
        <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/>
    </HEADER>
    <BODY>
        <METAWORKFLOW_NAME family="tired">Sleeping</METAWORKFLOW_NAME>
        <METAWORKFLOW_DESCRIPTION>Sleeping when tired</METAWORKFLOW_DESCRIPTION>
        <METAWORKFLOW_SECTION>Sleeping section</METAWORKFLOW_SECTION>
        <ADMIN_EMAIL>wf-admin-1@wf.com,wf-admin-2@wf.com</ADMIN_EMAIL>
        <CONFIGURATION status="EXECUTABLE">
            <WORKFLOW
                id="wf-UUID_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" 
                name="Sleeping Beauty"/>
        </CONFIGURATION>
        <SCHEDULING enabled="false">
            <CRON>29 5 22 ? * *</CRON>
            <MININTERVAL>10080</MININTERVAL>
        </SCHEDULING>
    </BODY>
</RESOURCE_PROFILE>
Let's have a look at the information in the BODY element:
  • METAWORKFLOW_NAME, family, METAWORKFLOW_DESCRIPTION, and METAWORKFLOW_SECTION, : guide the UI for a correct visualization
  • ADMIN_EMAIL: comma separated list of emails. Put here the emails of people that want to receive emails about the failure and completion of the workflows included in this meta-workflow.
  • WORKFLOW: reference to an existing workflow
  • SCHEDULING:
    • enabled: set to true if you want the meta-workflow to run automatically according to the given schedule
    • CRON: cron expression to schedule the meta-workflow execution
    • MININTERVAL: milliseconds (?) between two subsequent runs of the meta-workflow.

More complex meta-workflows may include different workflows running in sequence, in parallel, or a mix of them.
The data provision meta-workflow of OpenAIRE is a good example of a complex meta-workflow. It composes a total of 10 different workflows, where workflows uuid2, uuid3, uuid4, uuid5 runs in parallel. Note that the parallel/sequence composition is defined by the level of nesting of the WORKFLOW XML element.

<CONFIGURATION status="EXECUTABLE">
    <WORKFLOW id="uuid1_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="reset">
        <WORKFLOW id="uuid2_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="db2hbase"/>
        <WORKFLOW id="uuid3_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="oaf2hbase"/>
        <WORKFLOW id="uuid4_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="odf2hbase"/>
        <WORKFLOW id="uuid5_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="actions2hbase">
            <WORKFLOW id="uuid6_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="deduplication organizations">
                <WORKFLOW id="uuid7_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="deduplication persons">
                    <WORKFLOW id="uuid8_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="deduplication results">
                        <WORKFLOW id="uuid9_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="update provision">
                            <WORKFLOW id="uuid10_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="prepare pre-public info space"/>
                        </WORKFLOW>
                    </WORKFLOW>
                </WORKFLOW>
            </WORKFLOW>
        </WORKFLOW>
    </WORKFLOW>
</CONFIGURATION>

Updated by Alessia Bardi about 9 years ago · 9 revisions