Project

General

Profile

Creating a D-Net Workflow » History » Revision 15

Revision 14 (Alessia Bardi, 06/02/2015 06:23 PM) → Revision 15/16 (Alessia Bardi, 06/02/2015 06:26 PM)

h1. Creating a D-Net Workflow 

 Typologies of workflows: 
 * collection workflows: operate the systematic collection of metadata or files from data sources; 
 * processing workflows: implement the sequence of steps needed to stage collected metadata (or files) and form the target Information Spaces 

 h2. Defining a collection workflow 

 COMING SOON 

 h2. Definition of a processing workflow 

 The definition of a processing workflow basically consists in the creation of (at least) 2 D-Net profiles: 
 # One (or more) [[Creating a D-Net Workflow#Workflow definition|workflow]] (@WorkflowDSResourceType@) : it defines the sequence of steps to execute (in sequence or in parallel). 
 # One [[Creating a D-Net Workflow#Meta-Workflow definition|meta-workflow]] (@MetaWorkflowDSResourceType@): it describes a set of related workflows that can run in parallel or in sequence to accomplish a high-level functionality.  
 #* For example, in order to export the Information Space via OAI-PMH we have first to feed the OAI-PMH back-end (workflow 1: "OAI store feed"), then we can configure the back-end with the needed indexes (workflow 2: "OAI Post feed"). 
 
 h3. Workflow definition 

 Let's see a simple example of a workflow with only one step that performs a sleep: 
 <pre> 
 <?xml version="1.0" encoding="UTF-8"?> 
 <RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 
     <HEADER> 
         <RESOURCE_IDENTIFIER value="wfUUID_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/> 
         <RESOURCE_TYPE value="WorkflowDSResourceType"/> 
         <RESOURCE_KIND value="WorkflowDSResources"/> 
         <RESOURCE_URI value=""/> 
         <DATE_OF_CREATION value="2015-02-06T11:13:51.0Z"/> 
     </HEADER> 
     <BODY> 
         <WORKFLOW_NAME>Sleeping Beauty</WORKFLOW_NAME> 
         <WORKFLOW_TYPE>Tutorial</WORKFLOW_TYPE> 
         <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> 
         <CONFIGURATION start="manual"> 
             <NODE name="sleep" isStart="true" type="Sleep" > 
                 <DESCRIPTION>Sleep for a while</DESCRIPTION> 
                 <PARAMETERS> 
                     <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> 
                 </PARAMETERS> 
                 <ARCS> 
                     <ARC to="success"/> 
                 </ARCS> 
             </NODE> 
         </CONFIGURATION> 
         <STATUS/> 
     </BODY> 
 </RESOURCE_PROFILE> 
 </pre>  

 The @CONFIGURATION@ element of the profile tells us that the workflow is composed of one step. 
 The step is defined by the @NODE@ element: 
 * @name="sleep"@: the name that will appear in the UI when showing the workflow 
 * @isStart="true"@:    marks the node as starting node. Each workflow must have at least one start node 
 * @type="Sleep"@: the type of the node. Through this value we refer to a Java bean with name @wfNodeSleep@, whose class implements the business logic. Therefore we expect a bean like the following to be defined in an application-context XML: 
 <pre> 
 <bean id="wfNodeSleep" class="eu.dnetlib.msro.tutorial.SleepJobNode" scope="prototype"/> 
 </pre> 
 Note that the attribute @scope="prototype"@ is important because it tells the Spring framework to create a new bean instance of the object every time a request for that specific bean is made.  
 We'll see how the actual class @SleepJobNode@ in a while. 
 * @DESCRIPTION@: brief description of the logics of the node. It will appear in the UI when showing the workflow 
 * @PARAMETERS@: parameters needed by the node. A parameter (@PARAM@), has the follwoing attributes: 
 ** @name@ and @type@: name and type of the param. The Java class must define a property (accessible via public getter/setter) with the same name and type. 
 ** @required@: tells if the param is required or not. If it is, then the workflow can be run only if the param is set. 
 ** @managedBy@: who's in charge of setting the value. Allowed values are: 
 *** @user@: the UI will enable user to set the value 
 *** @system@: the UI will not enable the user to set the value, as the system is expected to set the parameter automatically (more on this in a while) 
 *** More attributes are available. TODO: add other optional attributes for parameter (e.g. @function@) 
 * @ARCS@: which node must be executed after this node. In the example, we simply go to the @success@ node, a pre-defined node that completes the workflow successfully. If there is an exception in one node, the pre-defined @failure@ node is executed automatically.  

 Now let's see how the business logic of the node is implemented in the Java class @eu.dnetlib.msro.tutorial.SleepJobNode@: 
 <pre> 
 public class SleepJobNode extends SimpleJobNode{ 
    private int sleepTimeMs; 
   
    @Override 
    protected String execute(final NodeToken token) throws Exception { 
       //removed exception handling for readability 
       Thread.sleep(sleepTimeMs); 
       return Arc.DEFAULT_ARC; 
    } 

    public int getSleepTimeMs(){ return sleepTimeMs;} 
    public void setSleepTimeMs(int time){ sleepTimeMs = time;} 
 } 
 </pre> 

 @SleepJobNode@ extends the class <pre>eu.dnetlib.msro.workflows.nodes.SimpleJobNode</pre>, which is a D-Net class extending Sarasvati <pre>com.googlecode.sarasvati.mem.MemNode</pre>. The method to override is 
 <pre>abstract protected String execute(final NodeToken token) throws Exception;</pre>  
 that must return the name of the arc to follow.  
 Through this mechanism you can guide the workflow to execute a path or another depending on what's happening during the execution of the current step. 

 For example, let's suppose we want to:  
 * sleep and go to success if sleepTimeMs < 2000  
 * sleep and go to failure if sleepTimeMs >= 2000 

 Our class should change to something like: 
 <pre> 
 public class SleepJobNode extends SimpleJobNode{ 
    private int sleepTimeMs; 
   
    @Override 
    protected String execute(final NodeToken token) throws Exception { 
       //removed exception handling for readability 
       Thread.sleep(sleepTimeMs); 
       if(sleepTimeMs < 2000) return "ok"; 
       else return "oops"; 
    } 

    public int getSleepTimeMs(){ return sleepTimeMs;} 
    public void setSleepTimeMs(int time){ sleepTimeMs = time;} 
 } 
 </pre> 

 and the workflow should be adapted to consider the two different arcs: "ok" and "oops" 
 <pre> 
 <NODE name="sleep" isStart="true" type="Sleep" > 
    <DESCRIPTION>Sleep for a while</DESCRIPTION> 
    <PARAMETERS> 
       <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> 
    </PARAMETERS> 
    <ARCS> 
       <ARC name="ok" to="success"/> 
       <ARC name="oops" to="failure"/> 
    </ARCS> 
 </NODE> 
 </pre> 

 D-Net offers three JobNode super classes to choose from: 
 * <pre>eu.dnetlib.msro.workflows.nodes.SimpleJobNode</pre>: sync execution. It runs the @execute@ method in the main thread. 
 * <pre>eu.dnetlib.msro.workflows.nodes.AsyncJobNode</pre>: async execution. It runs the @execute@ method in a separate thread.  
 * <pre>eu.dnetlib.msro.workflows.nodes.BlackboardJobNode</pre>: use this class when your node must perform a blackboard request to a D-Net service 

 h4. Extending BlackboardJobNode 

 D-Net services can implement the BlackBoard (BB) protocol, which allows them to receive messages. 
 If a message is known to a service, the service will execute the corresponding action and notify the caller about its execution status by updating the blackboard in the service profile. The management of reading/writing BB messages is implemented in the @BlackboardJobNode@ class, so that you only have to: 
 * Specify the profile id of the service you want to call by overriding the method 
 <pre>abstract protected String obtainServiceId(NodeToken token);</pre> 
 * Prepare the BB message by overriding the method  
 <pre>abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;</pre> 

 Let's suppose we have a service accepting the BB message "SLEEP" that requires a parameter named @sleepTimeMs@. 
 Our @SleepJobNode@ class becomes: 
 <pre> 
 public class SleepJobNode extends BlackboardJobNode{ 
    private String xqueryForServiceId; 

    private int sleepTimeMs; 

    @Override 
    protected String obtainServiceId(final NodeToken token) { 
        List<String> serviceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xqueryForServiceId); 
        if (serviceIds.size() < 1) throw new RuntimeException("Service id not found using query: " + xqueryForServiceId); 
        return serviceIds.get(0); 
    }    

    @Override 
    protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { 
       job.setAction("SLEEP"); 
       job.getParameters().put("sleepTimeMs", sleepTimeMs); 
    } 

    public int getSleepTimeMs(){ return sleepTimeMs;} 
    public void setSleepTimeMs(int time){ sleepTimeMs = time;} 
    public String getXqueryForServiceId() { return xqueryForServiceId;} 
    public void setXqueryForServiceId(final String xqueryForServiceId) { this.xqueryForServiceId = xqueryForServiceId;} 
 } 
 </pre> 

 And the workflow node must declare a new parameter for the @xqueryForServiceId@: 
 <pre> 
 <NODE name="sleep" isStart="true" type="Sleep"> 
     <DESCRIPTION>Sleep for a while</DESCRIPTION> 
     <PARAMETERS> 
         <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> 
         <PARAM name="xqueryForServiceId" managedBy="user" type="string" required="true">/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='SleepServiceResourceType']/HEADER/RESOURCE_IDENTIFIER/@value/string()</PARAM> 
     </PARAMETERS> 
     <ARCS> 
         <ARC to="success"/> 
     </ARCS> 
 </NODE> 
 </pre> 

 But what if one of your params shuld not be set by an end-user but it should rather be picked from the workflow env? 
 Let's suppose this is the case for the xquery. Then the SleepJobNode class should expect to receive the name of the env parameter where to look for the actual xquery to use: 
 <pre> 
 public class SleepJobNode extends BlackboardJobNode{ 
    private String xqueryForServiceIdParam; 
    . . .  
    @Override 
    protected String obtainServiceId(final NodeToken token) { 
        final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); 
        List<String> serviceIds = getServiceLocator().getService(ISLookUpService.class).quickSearchProfile(xquery); 
        . . . 
    }    
 . . .  
 </pre> 

 <pre> 
 <NODE name="sleep" isStart="true" type="Sleep"> 
     <DESCRIPTION>Sleep for a while</DESCRIPTION> 
     <PARAMETERS> 
         <PARAM name="sleepTimeMs" managedBy="user" type="int" required="true">1000</PARAM> 
         <PARAM managedBy="system" name="xqueryForServiceIdParam" required="true" type="string">xqueryForServiceId_in_env</PARAM> 
     </PARAMETERS> 
     <ARCS> 
         <ARC to="success"/> 
     </ARCS> 
 </NODE> 
 </pre> 

 h3. Special parameters 

 Note that in case of parameter name clashes, the order is @sysparams < envparams < params@, thus XML node params override params in the env, which in turns override sys params. 

 h4. envParams 

 @envParams@ is a special parameter that allows you to map one value of the wf env to another parameter of the same wf env, allowing to wave parameters from node to node. 
 This can be useful when different nodes of the same workflow want to use the same value but they are coded to read it from different env parameters. 
 Example: 
 <pre> 
 <PARAM required="true" type="string" name="envParams" managedBy="system"> 
   {  
      'path' : 'rottenRecordsPath' 
   } 
 </PARAM> 
 </pre> 
 The value of @envParams@ is a list of parameter mappings. In the example we have 1 mapping: the value in the env parameter named "rottenRecordsPath" is copied into another env parameter named "path". 
 Given the above configuration, a node will be able to find the value it needs in the expected env parameter ("path"). 

 h4. sysParams 

 @sysParams@ is a special parameter that allows you to set a parameter with a value coming from a system property (those you set in *.properties files). 
 Example: 
 <pre> 
 <PARAM required="true" type="string" name="sysParams" managedBy="system"> 
 {  	
    'hbase.mapred.inputtable' : 'hbase.mapred.datatable',  
    'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable' 
 } 
 </PARAM> 
 </pre> 
 The value of @sysParams@ is a list of parameter mappings. In the example we have 2 mapping: the value of the system property "hbase.mapred.datatable" will be copied into two new parameters: "hbase.mapred.inputtable" and "hbase.mapreduce.inputtable". 

 h3. Re-using existing job node beans 

 Often you will need generic nodes in your workflow that have already been implemented and used. 
 before you implement a new node for a generic feature, it is better to check here on "where is my bean":http://ci.research-infrastructures.eu/public/whereismybean/ and filter by "wfNode". 

 "where 
 Where is my bean":http://ci.research-infrastructures.eu/public/whereismybean/ bean will tell you which module and which application context defines the bean you need. 
 Consider that the service looks into the whole @dnet40/modules@ dnet40/modules svn directory, hence you will usually find several entries for the same bean. 

 
 Typically, you'll need the beans located will be in the module @dnet-msro-service@ and @dnet-*-workflows@ (e.g. @dnet-opnaireplus-workflows@ for the OpenAIRE project).  

  
 CNR suggests you to check out those modules from svn so you can quickly refer to node implementations, if you need it. 



 

 h3. Meta-Workflow definition 

 A meta-workflow implements a high-level functionality by composing workflows. 
 In the simplest scenario, one meta-workflow refers to only one workflow, as follows: 

 <pre> 
 <RESOURCE_PROFILE> 
     <HEADER> 
         <RESOURCE_IDENTIFIER 
             value="metaWF-UUID_TWV0YVdvcmtmbG93RFNSZXNvdXJjZXMvTWV0YVdvcmtmbG93RFNSZXNvdXJjZVR5cGU="/> 
         <RESOURCE_TYPE value="MetaWorkflowDSResourceType"/> 
         <RESOURCE_KIND value="MetaWorkflowDSResources"/> 
         <RESOURCE_URI value=""/> 
         <DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/> 
     </HEADER> 
     <BODY> 
         <METAWORKFLOW_NAME family="tutorial">Sleeping</METAWORKFLOW_NAME> 
         <METAWORKFLOW_DESCRIPTION>A sample meta-wf</METAWORKFLOW_DESCRIPTION> 
         <METAWORKFLOW_SECTION>Tutorial</METAWORKFLOW_SECTION> 
         <ADMIN_EMAIL>wf-admin-1@wf.com,wf-admin-2@wf.com</ADMIN_EMAIL> 
         <CONFIGURATION status="EXECUTABLE"> 
             <WORKFLOW 
                 id="wf-profileID" name="Sleeping Beauty"/> 
         </CONFIGURATION> 
         <SCHEDULING enabled="false"> 
             <CRON>29 5 22 ? * *</CRON> 
             <MININTERVAL>10080</MININTERVAL> 
         </SCHEDULING> 
     </BODY> 
 </RESOURCE_PROFILE> 
 </pre> 

 Let's have a look at the information in the @BODY@ element: 
 * @METAWORKFLOW_NAME@, @family@, @METAWORKFLOW_DESCRIPTION@, and @METAWORKFLOW_SECTION@, : guide the UI for a correct visualization 
 * @ADMIN_EMAIL@: comma separated list of emails. Put here the emails of people that want to receive emails about the failure and completion of the workflows included in this meta-workflow. 
 * @WORKFLOW@: reference to an existing workflow 
 * @SCHEDULING@:  
 ** @enabled@: set to true if you want the meta-workflow to run automatically according to the given schedule 
 ** @CRON@: cron expression to schedule the meta-workflow execution 
 ** @MININTERVAL@: milliseconds (?) between two subsequent runs of the meta-workflow. 

 More complex meta-workflows may include different workflows running in sequence, in parallel, or a mix of them. 
 The data provision meta-workflow of OpenAIRE is a good example of a complex meta-workflow. It composes a total of 10 different workflows, where workflows @profileID2@, @profileID3@, @profileID4@, @profileID5@ runs in parallel. Note that the parallel/sequence composition is defined by the level of nesting of the @WORKFLOW@ XML element. 
 <pre> 
 <CONFIGURATION status="EXECUTABLE"> 
     <WORKFLOW id="profileID1" name="reset"> 
         <WORKFLOW id="profileID2" name="db2hbase"/> 
         <WORKFLOW id="profileID3" name="oaf2hbase"/> 
         <WORKFLOW id="profileID4" name="odf2hbase"/> 
         <WORKFLOW id="profileID5" name="actions2hbase"> 
             <WORKFLOW id="profileID6" name="deduplication organizations"> 
                 <WORKFLOW id="profileID7" name="deduplication persons"> 
                     <WORKFLOW id="profileID8" name="deduplication results"> 
                         <WORKFLOW id="profileID9" name="update provision"> 
                             <WORKFLOW id="profileID10" name="prepare pre-public info space"/> 
                         </WORKFLOW> 
                     </WORKFLOW> 
                 </WORKFLOW> 
             </WORKFLOW> 
         </WORKFLOW> 
     </WORKFLOW> 
 </CONFIGURATION> 
 </pre>