Project

General

Profile

« Previous | Next » 

Revision 55281

Implemented Hadoop Transformation

View differences:

modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/TransformHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8

  
9
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
10

  
11
public class TransformHadoopJobNode extends SimpleJobNode {
12

  
13
	private static final Log log = LogFactory.getLog(TransformHadoopJobNode.class);
14

  
15
	private String ruleId;
16

  
17
	@Override
18
	protected String execute(final NodeToken token) throws Exception {
19
		// param 1 : hdfs path from
20
		// param 2 : hdfs path to
21
		// param 3 : transformation rule
22
		// param 4 : nameNode
23

  
24
		final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
25
		final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
26

  
27
		final String writing_mdId = token.getEnv().getAttribute("mdId");
28
		final String writing_versionId = token.getEnv().getAttribute("versionId");
29

  
30
		final String readingHdfsPath = "hdfs://mdstores/" + reading_mdId + "/" + reading_versionId;
31
		final String writingHdfsPath = "hdfs://mdstores/" + writing_mdId + "/" + writing_versionId;
32

  
33
		final String nameNode = "TransformHadoop";
34

  
35
		// TODO
36

  
37
		log.info("Transformation rule: " + ruleId);
38
		log.info("HDFS PATH for reading: " + readingHdfsPath);
39
		log.info("HDFS PATH for writing: " + writingHdfsPath);
40
		log.info("NameNode: " + nameNode);
41

  
42
		token.getEnv().setAttribute("mdStoreSize", 1234);
43

  
44
		return Arc.DEFAULT_ARC;
45
	}
46

  
47
	public String getRuleId() {
48
		return ruleId;
49
	}
50

  
51
	public void setRuleId(final String ruleId) {
52
		this.ruleId = ruleId;
53
	}
54
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/PrepareEnvTransformHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2 2

  
3
import eu.dnetlib.dhp.utils.DHPUtils;
4
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
5
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
3 6
import org.apache.commons.logging.Log;
4 7
import org.apache.commons.logging.LogFactory;
5 8

  
......
7 10
import com.googlecode.sarasvati.NodeToken;
8 11

  
9 12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import org.springframework.beans.factory.annotation.Autowired;
10 14

  
11 15
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode {
12 16

  
......
14 18

  
15 19
	private String hdfsBasePath;
16 20

  
21
	private String ruleId;
22

  
23
	@Autowired
24
	private UniqueServiceLocator serviceLocator;
25

  
17 26
	@Override
18 27
	protected String execute(final NodeToken token) throws Exception {
19 28

  
......
26 35
		final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId);
27 36
		final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId);
28 37

  
38

  
39
		final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId());
40

  
29 41
		token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis());
30 42
		token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId"));
31
		token.getEnv().setAttribute("hdfsNativePath", hdfsNativePath);
32
		token.getEnv().setAttribute("hdfsTransformedPath", hdfsTransformedPath);
43
		token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath);
44
		token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath);
45
		token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule));
33 46
		return Arc.DEFAULT_ARC;
34 47

  
35 48
	}
......
41 54
	public void setHdfsBasePath(final String hdfsBasePath) {
42 55
		this.hdfsBasePath = hdfsBasePath;
43 56
	}
57

  
58
	public String getRuleId() {
59
		return ruleId;
60
	}
61

  
62
	public void setRuleId(String ruleId) {
63
		this.ruleId = ruleId;
64
	}
65

  
66
	public UniqueServiceLocator getServiceLocator() {
67
		return serviceLocator;
68
	}
69

  
70
	public void setServiceLocator(UniqueServiceLocator serviceLocator) {
71
		this.serviceLocator = serviceLocator;
72
	}
44 73
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st
41 41
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
42 42
    <PARAMETERS>
43 43
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM>
44
        <PARAM managedBy="system" name="cluster" required="true" type="string">DHP</PARAM>
44
        <PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
45 45
        <PARAM managedBy="system" name="envParams" required="true" type="string">
46 46
            {
47 47
                "apiDescription":"apiDescription",
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st
36 36
		<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM>
37 37
	</PARAMETERS>
38 38
	<ARCS>
39
		<ARC to="PREPARE_ENV_TRANSFORM"/>
40
	</ARCS>
39
        <ARC to="PREPARE_ENV_TRANSFORMATION"/>
40
    </ARCS>
41 41
</NODE>
42

  
43
<NODE name="PREPARE_ENV_TRANSFORM" type="PrepareEnvTransformHadoopJobNode">
44
    <DESCRIPTION>Put in the environment all the variable needed to the transform oozie job </DESCRIPTION>
42
<NODE name="PREPARE_ENV_TRANSFORMATION" type="PrepareEnvTransformHadoopJobNode">
43
    <DESCRIPTION>Retrieve all the parameters needed to run the transformation workflow</DESCRIPTION>
45 44
    <PARAMETERS>
45
        <PARAM category="TRANSFORMATION_RULE_ID" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')" managedBy="user" name="ruleId" required="true" type="string"></PARAM>
46 46
        <PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM>
47 47
    </PARAMETERS>
48 48
    <ARCS>
49 49
        <ARC to="TRANSFORM_HADOOP"/>
50 50
    </ARCS>
51 51
</NODE>
52

  
53

  
54 52
<NODE name="TRANSFORM_HADOOP" type="SubmitDnetHadoopJobNode">
55
	<DESCRIPTION>Transform original records</DESCRIPTION>
56
	<PARAMETERS>
57
		<PARAM required="true" type="string" name="ruleId" category="TRANSFORMATION_RULE_ID" managedBy="user" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')"></PARAM>
58
		<PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM>
59
        <PARAM managedBy="system" name="cluster" required="true" type="string">DHP</PARAM>
53
    <DESCRIPTION>Start the Hadoop Job</DESCRIPTION>
54
    <PARAMETERS>
55
        <PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopTrasnformation</PARAM>
56
        <PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
60 57
        <PARAM managedBy="system" name="envParams" required="true" type="string">
61 58
            {
62
                "hdfsNativePath":"hdfsNativePath",
63
                "hdfsTransformedPath":"hdfsTransformedPath",
59
                "mdstoreInputPath":"mdstoreInputPath",
60
                "mdstoreOutputPath":"mdstoreOutputPath",
61
                "transformationRule":"transformationRule",
64 62
                "timestamp":"timestamp",
65 63
                "workflowId":"workflowId"
66 64
            }
67 65
        </PARAM>
68
	</PARAMETERS>
69
	<ARCS>
70
		<ARC to="COMMIT_VERSION"/>
71
	</ARCS>
66
    </PARAMETERS>
67
    <ARCS>
68
        <ARC to="COMMIT_VERSION"/>
69
        <ARC name="abort" to="ABORT_VERSION"/>
70
    </ARCS>
72 71
</NODE>
73

  
74 72
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion">
75
	<DESCRIPTION>Commit the mdstore version</DESCRIPTION>
76
	<PARAMETERS/>
77
	<ARCS>
78
		<ARC to="END_READING"/>
79
	</ARCS>
73
    <DESCRIPTION>Commit the mdstore version</DESCRIPTION>
74
    <PARAMETERS/>
75
    <ARCS>
76
        <ARC to="END_READING"/>
77
    </ARCS>
80 78
</NODE>
81

  
82 79
<NODE name="END_READING" type="EndReadingMDStore">
83
	<DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
84
	<PARAMETERS/>
85
	<ARCS>
86
		<ARC to="UPDATE_INFO"/>
87
	</ARCS>
80
    <DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
81
    <PARAMETERS/>
82
    <ARCS>
83
        <ARC to="UPDATE_INFO"/>
84
    </ARCS>
88 85
</NODE>
86
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion">
87
    <DESCRIPTION>Abort the mdstore version</DESCRIPTION>
88
    <PARAMETERS/>
89
    <ARCS>
90
        <ARC to="END_READING_ABORT"/>
91
    </ARCS>
92
</NODE>
93
<NODE name="END_READING_ABORT" type="EndReadingMDStore">
94
    <DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
95
    <PARAMETERS/>
96
    <ARCS>
97
        <ARC to="failure"/>
98
    </ARCS>
99
</NODE>
89 100

  
90 101
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop">
91 102
	<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION>
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
401 401
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.EndReadingMDStoreJobNode"
402 402
		p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" />
403 403
		
404
	<bean id="wfNodeTransformHadoop"
405
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.TransformHadoopJobNode"
406
		scope="prototype" />
407
		
404

  
408 405
	<bean id="wfNodePrepareEnvTransformHadoopJobNode"
409 406
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.PrepareEnvTransformHadoopJobNode"
410 407
		scope="prototype" />

Also available in: Unified diff