Project

General

Profile

« Previous | Next » 

Revision 60318

updated hadoop-specific aggregation workflows

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvTransformHadoopJobNode.java
11 11

  
12 12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13 13
import org.springframework.beans.factory.annotation.Autowired;
14
import org.springframework.beans.factory.annotation.Value;
14 15

  
15 16
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode {
16 17

  
......
25 26

  
26 27
    private String mdstoreOutput;
27 28

  
29
    @Value("${dnet.openaire.dhp.transformation.app.path}")
30
    private String oozieWfPath;
31

  
28 32
    @Override
29 33
    protected String execute(final NodeToken token) throws Exception {
30 34

  
31 35
        final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId());
32 36

  
37
        token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
33 38
        token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis());
34 39
        token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId"));
35 40
        token.getEnv().setAttribute("mdstoreInput", getMdstoreInput());
......
70 75
    public void setMdstoreOutput(String mdstoreOutput) {
71 76
        this.mdstoreOutput = mdstoreOutput;
72 77
    }
78

  
79
    public String getOozieWfPath() {
80
        return oozieWfPath;
81
    }
82

  
83
    public void setOozieWfPath(String oozieWfPath) {
84
        this.oozieWfPath = oozieWfPath;
85
    }
73 86
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvCollectHadoopJobNode.java
19 19
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
20 20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21 21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22
import org.springframework.beans.factory.annotation.Value;
22 23

  
23 24
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
24 25

  
......
38 39
     */
39 40
    private String collectionMode;
40 41

  
42
    @Value("${dnet.openaire.dhp.collection.app.path}")
43
    private String oozieWfPath;
44

  
41 45
    @Override
42 46
    protected String execute(final NodeToken token) throws Exception {
43 47

  
......
70 74
        if (opt.isPresent()) {
71 75
            token.getEnv().setAttribute("mdId", getMdId());
72 76
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
77
            token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
73 78

  
74 79
            final ApiDescriptor api = opt.get();
75 80
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
......
117 122
    public void setCollectionMode(String collectionMode) {
118 123
        this.collectionMode = collectionMode;
119 124
    }
125

  
126
    public String getOozieWfPath() {
127
        return oozieWfPath;
128
    }
129

  
130
    public void setOozieWfPath(String oozieWfPath) {
131
        this.oozieWfPath = oozieWfPath;
132
    }
120 133
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/CreateMDStoreHadoopJobNode.java
3 3
import java.util.HashMap;
4 4
import java.util.Map;
5 5

  
6
import com.google.gson.Gson;
6 7
import org.apache.commons.logging.Log;
7 8
import org.apache.commons.logging.LogFactory;
8 9
import org.springframework.beans.factory.annotation.Required;
......
45 46
                .buildAndExpand(params)
46 47
                .toUri();
47 48

  
49
        log.info("create mdstore request: " + uri.toString());
50

  
48 51
        final RestTemplate restTemplate = new RestTemplate();
49 52
        final MDStoreWithInfo result = restTemplate.getForObject(uri, MDStoreWithInfo.class);
50 53

  
51
        log.info("mdstore created " + result.toString());
54
        log.info("created mdstore: " + new Gson().toJson(result));
52 55

  
53 56
        token.getEnv().setAttribute(getOutputPrefix() + "format", format);
54 57
        token.getEnv().setAttribute(getOutputPrefix() + "layout", layout);
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/repo-hi/Aggregate_Metadata_from_PubsRepository_Hadoop.xml
7 7
        <DATE_OF_CREATION value="2019-04-03T16:44:41+02:00"/>
8 8
    </HEADER>
9 9
    <BODY>
10
        <WORKFLOW_NAME>Aggregate Metadata (publications) from PubsRepository [Hadoop]</WORKFLOW_NAME>
10
        <WORKFLOW_NAME>Aggregate Metadata DublinCore format [Hadoop]</WORKFLOW_NAME>
11 11
        <WORKFLOW_INFO>
12 12
            <FIELD name="Action">Aggregate Metadata</FIELD>
13 13
            <FIELD name="Consequence IS">Support</FIELD>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/repo-hi/Aggregate_Metadata_from_DataRepository_Hadoop.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="513da605-e9b4-4150-9116-f82a34a8585g_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
4
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
5
        <RESOURCE_KIND value="WorkflowDSResources"/>
6
        <RESOURCE_URI value="value3"/>
7
        <DATE_OF_CREATION value="2019-04-03T16:44:41+02:00"/>
8
    </HEADER>
9
    <BODY>
10
        <WORKFLOW_NAME>Aggregate Metadata Datacite format [Hadoop]</WORKFLOW_NAME>
11
        <WORKFLOW_INFO>
12
            <FIELD name="Action">Aggregate Metadata</FIELD>
13
            <FIELD name="Consequence IS">Support</FIELD>
14
            <FIELD name="Datasource class">PubsRepository</FIELD>
15
            <FIELD name="Content">publications</FIELD>
16
        </WORKFLOW_INFO>
17
        <WORKFLOW_TYPE>REPO_HI</WORKFLOW_TYPE>
18
        <WORKFLOW_PRIORITY>20</WORKFLOW_PRIORITY>
19
        <CONFIGURATION start="manual">
20
            <NODE isStart="true" name="VerifyDatasource" type="VerifyDatasource">
21
                <DESCRIPTION>Verify if DS is pending</DESCRIPTION>
22
                <PARAMETERS>
23
                    <PARAM managedBy="system" name="expectedInterfaceTypologyPrefixes" required="false" type="string"/>
24
                    <PARAM managedBy="system" name="expectedCompliancePrefixes" required="false" type="string"/>
25
                </PARAMETERS>
26
                <ARCS>
27
                    <ARC to="createMetaWf"/>
28
                    <ARC name="validateDs" to="validateDs"/>
29
                </ARCS>
30
            </NODE>
31
            <NODE name="validateDs" type="ValidateDatasource">
32
                <DESCRIPTION>Validate DS</DESCRIPTION>
33
                <PARAMETERS/>
34
                <ARCS>
35
                    <ARC to="createMetaWf"/>
36
                </ARCS>
37
            </NODE>
38
            <NODE name="createMetaWf" type="RegisterMetaWf">
39
                <DESCRIPTION>Create MetaWorkflow</DESCRIPTION>
40
                <PARAMETERS>
41
                    <PARAM managedBy="system" name="wfName" required="true" type="string">Aggregate Metadata (publications) from PubsRepository [Hadoop]
42
                    </PARAM>
43
                </PARAMETERS>
44
                <ARCS>
45
                    <ARC to="createDc"/>
46
                    <ARC to="createOaf"/>
47
                </ARCS>
48
            </NODE>
49
            <NODE name="createDc" type="CreateMDStoreHadoop">
50
                <DESCRIPTION>Create oai_dc store</DESCRIPTION>
51
                <PARAMETERS>
52
                    <PARAM managedBy="system" name="format" required="true" type="string">oai_datacite</PARAM>
53
                    <PARAM managedBy="system" name="interpretation" required="true" type="string">native</PARAM>
54
                    <PARAM managedBy="system" name="layout" required="true" type="string">store</PARAM>
55
                    <PARAM managedBy="system" name="outputPrefix" required="true" type="string">harv_</PARAM>
56
                </PARAMETERS>
57
                <ARCS>
58
                    <ARC to="updateMetaWf"/>
59
                </ARCS>
60
            </NODE>
61
            <NODE name="createOaf" type="CreateMDStoreHadoop">
62
                <DESCRIPTION>Create OAF store</DESCRIPTION>
63
                <PARAMETERS>
64
                    <PARAM managedBy="system" name="format" required="true" type="string">ODF</PARAM>
65
                    <PARAM managedBy="system" name="interpretation" required="true" type="string">cleaned</PARAM>
66
                    <PARAM managedBy="system" name="layout" required="true" type="string">store</PARAM>
67
                    <PARAM managedBy="system" name="outputPrefix" required="true" type="string">tran_</PARAM>
68
                </PARAMETERS>
69
                <ARCS>
70
                    <ARC to="updateMetaWf"/>
71
                </ARCS>
72
            </NODE>
73
            <NODE isJoin="true" name="updateMetaWf" type="UpdateMetaWf">
74
                <DESCRIPTION>Create MetaWorkflow</DESCRIPTION>
75
                <PARAMETERS>
76
                    <PARAM managedBy="system" name="beanName" required="true" type="string">metaWfPubsRepositoryHadoop</PARAM>
77
                </PARAMETERS>
78
                <ARCS>
79
                    <ARC to="updateMetaWfStatus"/>
80
                </ARCS>
81
            </NODE>
82
            <NODE name="updateMetaWfStatus" type="UpdateOpenaireMetaWfStatus">
83
                <DESCRIPTION>Update MetaWorkflow Status</DESCRIPTION>
84
                <PARAMETERS/>
85
                <ARCS>
86
                    <ARC to="success"/>
87
                </ARCS>
88
            </NODE>
89
        </CONFIGURATION>
90

  
91
        <STATUS/>
92
    </BODY>
93
</RESOURCE_PROFILE>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/applicationContext-msro-openaireplus.properties
96 96
  and  $x//CONFIGURATION/context/param[./@name='status']/text() != 'hidden'\
97 97
  return $x//CONFIGURATION/context/@id/string()
98 98
  
99
  dnet.openorgs.db.name=dev_orgs
99
dnet.openorgs.db.name=dev_orgs
100

  
101
dnet.openaire.dhp.collection.app.path = /user/dnet.dev/lib/collection/oozie_app
102
dnet.openaire.dhp.tranformation.app.path = /user/dnet.dev/lib/transformation/oozie_app
100 103
  
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st
23 23
<NODE name="PREPARE_ENV_COLLECTION" type="PrepareEnvCollectHadoopJobNode">
24 24
    <DESCRIPTION>Set in the environment all the variable needed to the collection oozie job</DESCRIPTION>
25 25
    <PARAMETERS>
26
        <PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
26
        <PARAM required="true" type="string" name="mdId" managedBy="system" category="MDSTORE_ID">$params.("harv_id")$</PARAM>
27 27
        <PARAM required="true" type="string" name="collectionMode" managedBy="user" function="validValues(['REFRESH','INCREMENTAL'])"></PARAM>
28 28
    </PARAMETERS>
29 29
    <ARCS>
......
45 45
                "dataSourceInfo":"dataSourceInfo",
46 46
                "timestamp":"timestamp",
47 47
                "identifierPath":"identifierPath",
48
                "workflowId":"workflowId"
48
                "workflowId":"workflowId",
49
                "oozie.wf.application.path":"oozieWfPath"
49 50
            }
50 51
        </PARAM>
51 52
    </PARAMETERS>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st
24 24
    <DESCRIPTION>Retrieve all the parameters needed to run the transformation workflow</DESCRIPTION>
25 25
    <PARAMETERS>
26 26
        <PARAM category="TRANSFORMATION_RULE_ID" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')" managedBy="user" name="ruleId" required="true" type="string"></PARAM>
27
        <PARAM required="true" type="string" name="mdstoreInput" managedBy="system">$params.("harv_id")$</PARAM>
28
        <PARAM required="true" type="string" name="mdstoreOutput" managedBy="system">$params.("tran_id")$</PARAM>
27
        <PARAM required="true" type="string" name="mdstoreInput" managedBy="system" category="MDSTORE_ID">$params.("harv_id")$</PARAM>
28
        <PARAM required="true" type="string" name="mdstoreOutput" managedBy="system" category="MDSTORE_ID">$params.("tran_id")$</PARAM>
29 29
    </PARAMETERS>
30 30
    <ARCS>
31 31
        <ARC to="TRANSFORM_HADOOP"/>
......
43 43
                "mdstoreOutput":"mdstoreOutput",
44 44
                "transformationRule":"transformationRule",
45 45
                "timestamp":"timestamp",
46
                "workflowId":"workflowId"
46
                "workflowId":"workflowId",
47
                "oozie.wf.application.path":"oozieWfPath"
47 48
            }
48 49
        </PARAM>
49 50
    </PARAMETERS>

Also available in: Unified diff