Revision 60316
Added by Claudio Atzori about 3 years ago
PrepareEnvTransformHadoopJobNode.java | ||
---|---|---|
16 | 16 |
|
17 | 17 |
private static final Log log = LogFactory.getLog(PrepareEnvTransformHadoopJobNode.class); |
18 | 18 |
|
19 |
private String hdfsBasePath; |
|
20 |
|
|
21 | 19 |
private String ruleId; |
22 | 20 |
|
23 | 21 |
@Autowired |
24 | 22 |
private UniqueServiceLocator serviceLocator; |
25 | 23 |
|
24 |
private String mdstoreInput; |
|
25 |
|
|
26 |
private String mdstoreOutput; |
|
27 |
|
|
26 | 28 |
@Override |
27 | 29 |
protected String execute(final NodeToken token) throws Exception { |
28 | 30 |
|
29 |
final String reading_mdId = token.getEnv().getAttribute("reading_mdId"); |
|
30 |
final String reading_versionId = token.getEnv().getAttribute("reading_versionId"); |
|
31 |
|
|
32 |
final String writing_mdId = token.getEnv().getAttribute("mdId"); |
|
33 |
final String writing_versionId = token.getEnv().getAttribute("versionId"); |
|
34 |
|
|
35 |
final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId); |
|
36 |
final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId); |
|
37 |
|
|
38 |
|
|
39 | 31 |
final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId()); |
40 | 32 |
|
41 | 33 |
token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis()); |
42 | 34 |
token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId")); |
43 |
token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath);
|
|
44 |
token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath);
|
|
35 |
token.getEnv().setAttribute("mdstoreInput", getMdstoreInput());
|
|
36 |
token.getEnv().setAttribute("mdstoreOutput", getMdstoreOutput());
|
|
45 | 37 |
token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule)); |
46 | 38 |
return Arc.DEFAULT_ARC; |
47 | 39 |
|
48 | 40 |
} |
49 | 41 |
|
50 |
public String getHdfsBasePath() { |
|
51 |
return hdfsBasePath; |
|
52 |
} |
|
53 |
|
|
54 |
public void setHdfsBasePath(final String hdfsBasePath) { |
|
55 |
this.hdfsBasePath = hdfsBasePath; |
|
56 |
} |
|
57 |
|
|
58 | 42 |
public String getRuleId() { |
59 | 43 |
return ruleId; |
60 | 44 |
} |
... | ... | |
70 | 54 |
public void setServiceLocator(UniqueServiceLocator serviceLocator) { |
71 | 55 |
this.serviceLocator = serviceLocator; |
72 | 56 |
} |
57 |
|
|
58 |
public String getMdstoreInput() { |
|
59 |
return mdstoreInput; |
|
60 |
} |
|
61 |
|
|
62 |
public void setMdstoreInput(String mdstoreInput) { |
|
63 |
this.mdstoreInput = mdstoreInput; |
|
64 |
} |
|
65 |
|
|
66 |
public String getMdstoreOutput() { |
|
67 |
return mdstoreOutput; |
|
68 |
} |
|
69 |
|
|
70 |
public void setMdstoreOutput(String mdstoreOutput) { |
|
71 |
this.mdstoreOutput = mdstoreOutput; |
|
72 |
} |
|
73 | 73 |
} |
Also available in: Unified diff
updated hadoop-specific aggregation workflows