Revision 55281
Added by Sandro La Bruzzo almost 5 years ago
PrepareEnvTransformHadoopJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop; |
2 | 2 |
|
3 |
import eu.dnetlib.dhp.utils.DHPUtils; |
|
4 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
5 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
3 | 6 |
import org.apache.commons.logging.Log; |
4 | 7 |
import org.apache.commons.logging.LogFactory; |
5 | 8 |
|
... | ... | |
7 | 10 |
import com.googlecode.sarasvati.NodeToken; |
8 | 11 |
|
9 | 12 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
13 |
import org.springframework.beans.factory.annotation.Autowired; |
|
10 | 14 |
|
11 | 15 |
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode { |
12 | 16 |
|
... | ... | |
14 | 18 |
|
15 | 19 |
private String hdfsBasePath; |
16 | 20 |
|
21 |
private String ruleId; |
|
22 |
|
|
23 |
@Autowired |
|
24 |
private UniqueServiceLocator serviceLocator; |
|
25 |
|
|
17 | 26 |
@Override |
18 | 27 |
protected String execute(final NodeToken token) throws Exception { |
19 | 28 |
|
... | ... | |
26 | 35 |
final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId); |
27 | 36 |
final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId); |
28 | 37 |
|
38 |
|
|
39 |
final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId()); |
|
40 |
|
|
29 | 41 |
token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis()); |
30 | 42 |
token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId")); |
31 |
token.getEnv().setAttribute("hdfsNativePath", hdfsNativePath); |
|
32 |
token.getEnv().setAttribute("hdfsTransformedPath", hdfsTransformedPath); |
|
43 |
token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath); |
|
44 |
token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath); |
|
45 |
token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule)); |
|
33 | 46 |
return Arc.DEFAULT_ARC; |
34 | 47 |
|
35 | 48 |
} |
... | ... | |
41 | 54 |
public void setHdfsBasePath(final String hdfsBasePath) { |
42 | 55 |
this.hdfsBasePath = hdfsBasePath; |
43 | 56 |
} |
57 |
|
|
58 |
public String getRuleId() { |
|
59 |
return ruleId; |
|
60 |
} |
|
61 |
|
|
62 |
public void setRuleId(String ruleId) { |
|
63 |
this.ruleId = ruleId; |
|
64 |
} |
|
65 |
|
|
66 |
public UniqueServiceLocator getServiceLocator() { |
|
67 |
return serviceLocator; |
|
68 |
} |
|
69 |
|
|
70 |
public void setServiceLocator(UniqueServiceLocator serviceLocator) { |
|
71 |
this.serviceLocator = serviceLocator; |
|
72 |
} |
|
44 | 73 |
} |
Also available in: Unified diff
Implemented Hadoop Transformation