Revision 55281
Added by Sandro La Bruzzo about 5 years ago
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/TransformHadoopJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
|
|
6 |
import com.googlecode.sarasvati.Arc; |
|
7 |
import com.googlecode.sarasvati.NodeToken; |
|
8 |
|
|
9 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
|
10 |
|
|
11 |
public class TransformHadoopJobNode extends SimpleJobNode { |
|
12 |
|
|
13 |
private static final Log log = LogFactory.getLog(TransformHadoopJobNode.class); |
|
14 |
|
|
15 |
private String ruleId; |
|
16 |
|
|
17 |
@Override |
|
18 |
protected String execute(final NodeToken token) throws Exception { |
|
19 |
// param 1 : hdfs path from |
|
20 |
// param 2 : hdfs path to |
|
21 |
// param 3 : transformation rule |
|
22 |
// param 4 : nameNode |
|
23 |
|
|
24 |
final String reading_mdId = token.getEnv().getAttribute("reading_mdId"); |
|
25 |
final String reading_versionId = token.getEnv().getAttribute("reading_versionId"); |
|
26 |
|
|
27 |
final String writing_mdId = token.getEnv().getAttribute("mdId"); |
|
28 |
final String writing_versionId = token.getEnv().getAttribute("versionId"); |
|
29 |
|
|
30 |
final String readingHdfsPath = "hdfs://mdstores/" + reading_mdId + "/" + reading_versionId; |
|
31 |
final String writingHdfsPath = "hdfs://mdstores/" + writing_mdId + "/" + writing_versionId; |
|
32 |
|
|
33 |
final String nameNode = "TransformHadoop"; |
|
34 |
|
|
35 |
// TODO |
|
36 |
|
|
37 |
log.info("Transformation rule: " + ruleId); |
|
38 |
log.info("HDFS PATH for reading: " + readingHdfsPath); |
|
39 |
log.info("HDFS PATH for writing: " + writingHdfsPath); |
|
40 |
log.info("NameNode: " + nameNode); |
|
41 |
|
|
42 |
token.getEnv().setAttribute("mdStoreSize", 1234); |
|
43 |
|
|
44 |
return Arc.DEFAULT_ARC; |
|
45 |
} |
|
46 |
|
|
47 |
public String getRuleId() { |
|
48 |
return ruleId; |
|
49 |
} |
|
50 |
|
|
51 |
public void setRuleId(final String ruleId) { |
|
52 |
this.ruleId = ruleId; |
|
53 |
} |
|
54 |
} |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/PrepareEnvTransformHadoopJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop; |
2 | 2 |
|
3 |
import eu.dnetlib.dhp.utils.DHPUtils; |
|
4 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
5 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
3 | 6 |
import org.apache.commons.logging.Log; |
4 | 7 |
import org.apache.commons.logging.LogFactory; |
5 | 8 |
|
... | ... | |
7 | 10 |
import com.googlecode.sarasvati.NodeToken; |
8 | 11 |
|
9 | 12 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
13 |
import org.springframework.beans.factory.annotation.Autowired; |
|
10 | 14 |
|
11 | 15 |
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode { |
12 | 16 |
|
... | ... | |
14 | 18 |
|
15 | 19 |
private String hdfsBasePath; |
16 | 20 |
|
21 |
private String ruleId; |
|
22 |
|
|
23 |
@Autowired |
|
24 |
private UniqueServiceLocator serviceLocator; |
|
25 |
|
|
17 | 26 |
@Override |
18 | 27 |
protected String execute(final NodeToken token) throws Exception { |
19 | 28 |
|
... | ... | |
26 | 35 |
final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId); |
27 | 36 |
final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId); |
28 | 37 |
|
38 |
|
|
39 |
final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId()); |
|
40 |
|
|
29 | 41 |
token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis()); |
30 | 42 |
token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId")); |
31 |
token.getEnv().setAttribute("hdfsNativePath", hdfsNativePath); |
|
32 |
token.getEnv().setAttribute("hdfsTransformedPath", hdfsTransformedPath); |
|
43 |
token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath); |
|
44 |
token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath); |
|
45 |
token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule)); |
|
33 | 46 |
return Arc.DEFAULT_ARC; |
34 | 47 |
|
35 | 48 |
} |
... | ... | |
41 | 54 |
public void setHdfsBasePath(final String hdfsBasePath) { |
42 | 55 |
this.hdfsBasePath = hdfsBasePath; |
43 | 56 |
} |
57 |
|
|
58 |
public String getRuleId() { |
|
59 |
return ruleId; |
|
60 |
} |
|
61 |
|
|
62 |
public void setRuleId(String ruleId) { |
|
63 |
this.ruleId = ruleId; |
|
64 |
} |
|
65 |
|
|
66 |
public UniqueServiceLocator getServiceLocator() { |
|
67 |
return serviceLocator; |
|
68 |
} |
|
69 |
|
|
70 |
public void setServiceLocator(UniqueServiceLocator serviceLocator) { |
|
71 |
this.serviceLocator = serviceLocator; |
|
72 |
} |
|
44 | 73 |
} |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st | ||
---|---|---|
41 | 41 |
<DESCRIPTION>Start the Hadoop Job</DESCRIPTION> |
42 | 42 |
<PARAMETERS> |
43 | 43 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM> |
44 |
<PARAM managedBy="system" name="cluster" required="true" type="string">DHP</PARAM>
|
|
44 |
<PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM>
|
|
45 | 45 |
<PARAM managedBy="system" name="envParams" required="true" type="string"> |
46 | 46 |
{ |
47 | 47 |
"apiDescription":"apiDescription", |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st | ||
---|---|---|
36 | 36 |
<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM> |
37 | 37 |
</PARAMETERS> |
38 | 38 |
<ARCS> |
39 |
<ARC to="PREPARE_ENV_TRANSFORM"/>
|
|
40 |
</ARCS>
|
|
39 |
<ARC to="PREPARE_ENV_TRANSFORMATION"/>
|
|
40 |
</ARCS>
|
|
41 | 41 |
</NODE> |
42 |
|
|
43 |
<NODE name="PREPARE_ENV_TRANSFORM" type="PrepareEnvTransformHadoopJobNode"> |
|
44 |
<DESCRIPTION>Put in the environment all the variable needed to the transform oozie job </DESCRIPTION> |
|
42 |
<NODE name="PREPARE_ENV_TRANSFORMATION" type="PrepareEnvTransformHadoopJobNode"> |
|
43 |
<DESCRIPTION>Retrieve all the parameters needed to run the transformation workflow</DESCRIPTION> |
|
45 | 44 |
<PARAMETERS> |
45 |
<PARAM category="TRANSFORMATION_RULE_ID" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')" managedBy="user" name="ruleId" required="true" type="string"></PARAM> |
|
46 | 46 |
<PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM> |
47 | 47 |
</PARAMETERS> |
48 | 48 |
<ARCS> |
49 | 49 |
<ARC to="TRANSFORM_HADOOP"/> |
50 | 50 |
</ARCS> |
51 | 51 |
</NODE> |
52 |
|
|
53 |
|
|
54 | 52 |
<NODE name="TRANSFORM_HADOOP" type="SubmitDnetHadoopJobNode"> |
55 |
<DESCRIPTION>Transform original records</DESCRIPTION> |
|
56 |
<PARAMETERS> |
|
57 |
<PARAM required="true" type="string" name="ruleId" category="TRANSFORMATION_RULE_ID" managedBy="user" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')"></PARAM> |
|
58 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM> |
|
59 |
<PARAM managedBy="system" name="cluster" required="true" type="string">DHP</PARAM> |
|
53 |
<DESCRIPTION>Start the Hadoop Job</DESCRIPTION> |
|
54 |
<PARAMETERS> |
|
55 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopTrasnformation</PARAM> |
|
56 |
<PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM> |
|
60 | 57 |
<PARAM managedBy="system" name="envParams" required="true" type="string"> |
61 | 58 |
{ |
62 |
"hdfsNativePath":"hdfsNativePath", |
|
63 |
"hdfsTransformedPath":"hdfsTransformedPath", |
|
59 |
"mdstoreInputPath":"mdstoreInputPath", |
|
60 |
"mdstoreOutputPath":"mdstoreOutputPath", |
|
61 |
"transformationRule":"transformationRule", |
|
64 | 62 |
"timestamp":"timestamp", |
65 | 63 |
"workflowId":"workflowId" |
66 | 64 |
} |
67 | 65 |
</PARAM> |
68 |
</PARAMETERS> |
|
69 |
<ARCS> |
|
70 |
<ARC to="COMMIT_VERSION"/> |
|
71 |
</ARCS> |
|
66 |
</PARAMETERS> |
|
67 |
<ARCS> |
|
68 |
<ARC to="COMMIT_VERSION"/> |
|
69 |
<ARC name="abort" to="ABORT_VERSION"/> |
|
70 |
</ARCS> |
|
72 | 71 |
</NODE> |
73 |
|
|
74 | 72 |
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion"> |
75 |
<DESCRIPTION>Commit the mdstore version</DESCRIPTION>
|
|
76 |
<PARAMETERS/>
|
|
77 |
<ARCS>
|
|
78 |
<ARC to="END_READING"/>
|
|
79 |
</ARCS>
|
|
73 |
<DESCRIPTION>Commit the mdstore version</DESCRIPTION>
|
|
74 |
<PARAMETERS/>
|
|
75 |
<ARCS>
|
|
76 |
<ARC to="END_READING"/>
|
|
77 |
</ARCS>
|
|
80 | 78 |
</NODE> |
81 |
|
|
82 | 79 |
<NODE name="END_READING" type="EndReadingMDStore"> |
83 |
<DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
|
|
84 |
<PARAMETERS/>
|
|
85 |
<ARCS>
|
|
86 |
<ARC to="UPDATE_INFO"/>
|
|
87 |
</ARCS>
|
|
80 |
<DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION>
|
|
81 |
<PARAMETERS/>
|
|
82 |
<ARCS>
|
|
83 |
<ARC to="UPDATE_INFO"/>
|
|
84 |
</ARCS>
|
|
88 | 85 |
</NODE> |
86 |
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion"> |
|
87 |
<DESCRIPTION>Abort the mdstore version</DESCRIPTION> |
|
88 |
<PARAMETERS/> |
|
89 |
<ARCS> |
|
90 |
<ARC to="END_READING_ABORT"/> |
|
91 |
</ARCS> |
|
92 |
</NODE> |
|
93 |
<NODE name="END_READING_ABORT" type="EndReadingMDStore"> |
|
94 |
<DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION> |
|
95 |
<PARAMETERS/> |
|
96 |
<ARCS> |
|
97 |
<ARC to="failure"/> |
|
98 |
</ARCS> |
|
99 |
</NODE> |
|
89 | 100 |
|
90 | 101 |
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop"> |
91 | 102 |
<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION> |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml | ||
---|---|---|
401 | 401 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.EndReadingMDStoreJobNode" |
402 | 402 |
p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" /> |
403 | 403 |
|
404 |
<bean id="wfNodeTransformHadoop" |
|
405 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.TransformHadoopJobNode" |
|
406 |
scope="prototype" /> |
|
407 |
|
|
404 |
|
|
408 | 405 |
<bean id="wfNodePrepareEnvTransformHadoopJobNode" |
409 | 406 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.PrepareEnvTransformHadoopJobNode" |
410 | 407 |
scope="prototype" /> |
Also available in: Unified diff
Implemented Hadoop Transformation