Project

General

Profile

« Previous | Next » 

Revision 60316

updated hadoop-specific aggregation workflows

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvTransformHadoopJobNode.java
16 16

  
17 17
    private static final Log log = LogFactory.getLog(PrepareEnvTransformHadoopJobNode.class);
18 18

  
19
    private String hdfsBasePath;
20

  
21 19
    private String ruleId;
22 20

  
23 21
    @Autowired
24 22
    private UniqueServiceLocator serviceLocator;
25 23

  
24
    private String mdstoreInput;
25

  
26
    private String mdstoreOutput;
27

  
26 28
    @Override
27 29
    protected String execute(final NodeToken token) throws Exception {
28 30

  
29
        final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
30
        final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
31

  
32
        final String writing_mdId = token.getEnv().getAttribute("mdId");
33
        final String writing_versionId = token.getEnv().getAttribute("versionId");
34

  
35
        final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId);
36
        final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId);
37

  
38

  
39 31
        final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId());
40 32

  
41 33
        token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis());
42 34
        token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId"));
43
        token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath);
44
        token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath);
35
        token.getEnv().setAttribute("mdstoreInput", getMdstoreInput());
36
        token.getEnv().setAttribute("mdstoreOutput", getMdstoreOutput());
45 37
        token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule));
46 38
        return Arc.DEFAULT_ARC;
47 39

  
48 40
    }
49 41

  
50
    public String getHdfsBasePath() {
51
        return hdfsBasePath;
52
    }
53

  
54
    public void setHdfsBasePath(final String hdfsBasePath) {
55
        this.hdfsBasePath = hdfsBasePath;
56
    }
57

  
58 42
    public String getRuleId() {
59 43
        return ruleId;
60 44
    }
......
70 54
    public void setServiceLocator(UniqueServiceLocator serviceLocator) {
71 55
        this.serviceLocator = serviceLocator;
72 56
    }
57

  
58
    public String getMdstoreInput() {
59
        return mdstoreInput;
60
    }
61

  
62
    public void setMdstoreInput(String mdstoreInput) {
63
        this.mdstoreInput = mdstoreInput;
64
    }
65

  
66
    public String getMdstoreOutput() {
67
        return mdstoreOutput;
68
    }
69

  
70
    public void setMdstoreOutput(String mdstoreOutput) {
71
        this.mdstoreOutput = mdstoreOutput;
72
    }
73 73
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvCollectHadoopJobNode.java
5 5

  
6 6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7 7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
8 9
import org.apache.commons.logging.Log;
9 10
import org.apache.commons.logging.LogFactory;
10 11
import org.springframework.beans.factory.annotation.Autowired;
......
22 23
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
23 24

  
24 25
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
26
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
25 27

  
26 28
    @Autowired
27 29
    private LocalDatasourceManager<?, ?> dsManager;
28 30

  
29
    private String hdfsBasePath;
31
    /**
32
     * MDStore identifier
33
     */
34
    private String mdId;
30 35

  
36
    /**
37
     * REFRESH | INCREMENTAL
38
     */
39
    private String collectionMode;
40

  
31 41
    @Override
32 42
    protected String execute(final NodeToken token) throws Exception {
33 43

  
......
36 46
        // param 3 : nameNode
37 47

  
38 48
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
49
        log.info("dsId: " + dsId);
39 50
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
40
        final String mdId = token.getEnv().getAttribute("mdId");
41
        final String versionId = token.getEnv().getAttribute("versionId");
51
        log.info("apiId: " + apiId);
42 52

  
43 53
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
44 54
                .stream()
......
48 58
                    res.setBaseUrl(a.getBaseurl());
49 59
                    res.setId(a.getId());
50 60
                    res.setProtocol(a.getProtocol());
51
                    res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
61
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
52 62
                    res.getParams().putAll(a.getApiParams()
53 63
                            .stream()
54 64
                            .map(o -> (ApiParam) o)
......
58 68
                .findFirst();
59 69

  
60 70
        if (opt.isPresent()) {
71
            token.getEnv().setAttribute("mdId", getMdId());
72
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
73

  
61 74
            final ApiDescriptor api = opt.get();
62
            final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId);
63
            final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId);
64 75
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
65
            token.getEnv().setAttribute("mdStorePath", hdfsPath);
66
            token.getEnv().setAttribute("sequenceFilePath", seqFilePath);
76

  
67 77
            final Provenance provenance = new Provenance();
68 78
            provenance.setDatasourceId(dsId);
69 79
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
70 80
            provenance.setDatasourceName(ds.getOfficialname());
71 81
            provenance.setNsPrefix(ds.getNamespaceprefix());
72
            token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance));
82
            final String dsProvenance = new Gson().toJson(provenance);
83
            log.info("datasource provenance: " + dsProvenance);
84

  
85
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
73 86
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
74
            token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
87
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
75 88
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
76 89

  
77 90
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
......
85 98

  
86 99
            return Arc.DEFAULT_ARC;
87 100
        } else {
88
            return "abort";
101
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
89 102
        }
103
    }
90 104

  
105
    public String getMdId() {
106
        return mdId;
91 107
    }
92 108

  
93
    public String getHdfsBasePath() {
94
        return hdfsBasePath;
109
    public void setMdId(String mdId) {
110
        this.mdId = mdId;
95 111
    }
96 112

  
97
    public void setHdfsBasePath(String hdfsBasePath) {
98
        this.hdfsBasePath = hdfsBasePath;
113
    public String getCollectionMode() {
114
        return collectionMode;
99 115
    }
116

  
117
    public void setCollectionMode(String collectionMode) {
118
        this.collectionMode = collectionMode;
119
    }
100 120
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st
1
 <NODE name="SET_INFO" isStart="true" type="SetProviderInfo">
1
<NODE name="SET_INFO" isStart="true" type="SetProviderInfo">
2 2
	<DESCRIPTION>Set information about current provider</DESCRIPTION>
3 3
	<PARAMETERS>
4 4
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
......
16 16
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
17 17
	</PARAMETERS>
18 18
	<ARCS>
19
		<ARC to="PREPARE_STORE_VERSION"/>
19
		<ARC to="PREPARE_ENV_COLLECTION"/>
20 20
	</ARCS>
21 21
</NODE>
22
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion">
23
	<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION>
24
	<PARAMETERS>
25
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
26
	</PARAMETERS>
27
	<ARCS>
28
        <ARC to="PREPARE_ENV_COLLECTION"/>
29
    </ARCS>
30
</NODE>
22

  
31 23
<NODE name="PREPARE_ENV_COLLECTION" type="PrepareEnvCollectHadoopJobNode">
32
    <DESCRIPTION>Put in the environment all the variable needed to the collection oozie job </DESCRIPTION>
24
    <DESCRIPTION>Set in the environment all the variable needed to the collection oozie job</DESCRIPTION>
33 25
    <PARAMETERS>
34
        <PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM>
26
        <PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
27
        <PARAM required="true" type="string" name="collectionMode" managedBy="user" function="validValues(['REFRESH','INCREMENTAL'])"></PARAM>
35 28
    </PARAMETERS>
36 29
    <ARCS>
37 30
        <ARC to="COLLECT_HADOOP"/>
38 31
    </ARCS>
39 32
</NODE>
40
<NODE name="COLLECT_HADOOP" type="SubmitDnetHadoopJobNode">
33

  
34
<NODE name="COLLECT_HADOOP" type="SubmitHadoopJob">
41 35
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
42 36
    <PARAMETERS>
43
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM>
37
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
44 38
        <PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
45 39
        <PARAM managedBy="system" name="envParams" required="true" type="string">
46 40
            {
41
                "mdId":"mdId",
42
                "collectionMode":"collectionMode",
47 43
                "apiDescription":"apiDescription",
48
                "mdStorePath":"mdStorePath",
49 44
                "sequenceFilePath":"sequenceFilePath",
50
                "dataSourceInfo":"dataSourceInfo"  ,
45
                "dataSourceInfo":"dataSourceInfo",
51 46
                "timestamp":"timestamp",
52 47
                "identifierPath":"identifierPath",
53 48
                "workflowId":"workflowId"
......
55 50
        </PARAM>
56 51
    </PARAMETERS>
57 52
    <ARCS>
58
        <ARC to="COMMIT_VERSION"/>
59
        <ARC name="abort" to="ABORT_VERSION"/>
53
        <ARC to="UPDATE_INFO"/>
60 54
    </ARCS>
61 55
</NODE>
62 56

  
63
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion">
64
	<DESCRIPTION>Commit the mdstore version</DESCRIPTION>
65
	<PARAMETERS/>
66
	<ARCS>
67
		<ARC to="UPDATE_INFO"/>
68
	</ARCS>
69
</NODE>
70

  
71
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion">
72
	<DESCRIPTION>Abort the mdstore version</DESCRIPTION>
73
	<PARAMETERS/>
74
	<ARCS>
75
		<ARC to="failure"/>
76
	</ARCS>
77
</NODE>
78

  
79 57
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop">
80 58
	<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION>
81 59
	<PARAMETERS>
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st
16 16
		<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM>
17 17
	</PARAMETERS>
18 18
	<ARCS>
19
		<ARC to="START_READING"/>
19
		<ARC to="PREPARE_ENV_TRANSFORMATION"/>
20 20
	</ARCS>
21 21
</NODE>
22 22

  
23
<NODE name="START_READING" type="StartReadingMDStore">
24
	<DESCRIPTION>Start reading Hadoop MD Store</DESCRIPTION>
25
	<PARAMETERS>
26
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM>
27
	</PARAMETERS>
28
	<ARCS>
29
		<ARC to="PREPARE_STORE_VERSION"/>
30
	</ARCS>
31
</NODE>
32

  
33
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion">
34
	<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION>
35
	<PARAMETERS>
36
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM>
37
	</PARAMETERS>
38
	<ARCS>
39
        <ARC to="PREPARE_ENV_TRANSFORMATION"/>
40
    </ARCS>
41
</NODE>
42 23
<NODE name="PREPARE_ENV_TRANSFORMATION" type="PrepareEnvTransformHadoopJobNode">
43 24
    <DESCRIPTION>Retrieve all the parameters needed to run the transformation workflow</DESCRIPTION>
44 25
    <PARAMETERS>
45 26
        <PARAM category="TRANSFORMATION_RULE_ID" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')" managedBy="user" name="ruleId" required="true" type="string"></PARAM>
46
        <PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM>
27
        <PARAM required="true" type="string" name="mdstoreInput" managedBy="system">$params.("harv_id")$</PARAM>
28
        <PARAM required="true" type="string" name="mdstoreOutput" managedBy="system">$params.("tran_id")$</PARAM>
47 29
    </PARAMETERS>
48 30
    <ARCS>
49 31
        <ARC to="TRANSFORM_HADOOP"/>
50 32
    </ARCS>
51 33
</NODE>
52
<NODE name="TRANSFORM_HADOOP" type="SubmitDnetHadoopJobNode">
34

  
35
<NODE name="TRANSFORM_HADOOP" type="SubmitHadoopJob">
53 36
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
54 37
    <PARAMETERS>
55
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopTrasnformation</PARAM>
38
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
56 39
        <PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
57 40
        <PARAM managedBy="system" name="envParams" required="true" type="string">
58 41
            {
59
                "mdstoreInputPath":"mdstoreInputPath",
60
                "mdstoreOutputPath":"mdstoreOutputPath",
42
                "mdstoreInput":"mdstoreInput",
43
                "mdstoreOutput":"mdstoreOutput",
61 44
                "transformationRule":"transformationRule",
62 45
                "timestamp":"timestamp",
63 46
                "workflowId":"workflowId"
......
65 48
        </PARAM>
66 49
    </PARAMETERS>
67 50
    <ARCS>
68
        <ARC to="COMMIT_VERSION"/>
69
        <ARC name="abort" to="ABORT_VERSION"/>
70
    </ARCS>
71
</NODE>
72
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion">
73
    <DESCRIPTION>Commit the mdstore version</DESCRIPTION>
74
    <PARAMETERS/>
75
    <ARCS>
76
        <ARC to="END_READING"/>
77
    </ARCS>
78
</NODE>
79
<NODE name="END_READING" type="EndReadingMDStore">
80
    <DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
81
    <PARAMETERS/>
82
    <ARCS>
83 51
        <ARC to="UPDATE_INFO"/>
84 52
    </ARCS>
85 53
</NODE>
86
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion">
87
    <DESCRIPTION>Abort the mdstore version</DESCRIPTION>
88
    <PARAMETERS/>
89
    <ARCS>
90
        <ARC to="END_READING_ABORT"/>
91
    </ARCS>
92
</NODE>
93
<NODE name="END_READING_ABORT" type="EndReadingMDStore">
94
    <DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
95
    <PARAMETERS/>
96
    <ARCS>
97
        <ARC to="failure"/>
98
    </ARCS>
99
</NODE>
100 54

  
101 55
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop">
102 56
	<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION>

Also available in: Unified diff