Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

    
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

    
6
import com.googlecode.sarasvati.Arc;
7
import com.googlecode.sarasvati.NodeToken;
8

    
9
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
10

    
11
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode {
12

    
13
	private static final Log log = LogFactory.getLog(PrepareEnvTransformHadoopJobNode.class);
14

    
15
	private String hdfsBasePath;
16

    
17
	@Override
18
	protected String execute(final NodeToken token) throws Exception {
19

    
20
		final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
21
		final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
22

    
23
		final String writing_mdId = token.getEnv().getAttribute("mdId");
24
		final String writing_versionId = token.getEnv().getAttribute("versionId");
25

    
26
		final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId);
27
		final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId);
28

    
29
		token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis());
30
		token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId"));
31
		token.getEnv().setAttribute("hdfsNativePath", hdfsNativePath);
32
		token.getEnv().setAttribute("hdfsTransformedPath", hdfsTransformedPath);
33
		return Arc.DEFAULT_ARC;
34

    
35
	}
36

    
37
	public String getHdfsBasePath() {
38
		return hdfsBasePath;
39
	}
40

    
41
	public void setHdfsBasePath(final String hdfsBasePath) {
42
		this.hdfsBasePath = hdfsBasePath;
43
	}
44
}
(8-8/12)