Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

    
3
import eu.dnetlib.dhp.utils.DHPUtils;
4
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
5
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import org.springframework.beans.factory.annotation.Autowired;
14

    
15
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode {
16

    
17
	private static final Log log = LogFactory.getLog(PrepareEnvTransformHadoopJobNode.class);
18

    
19
	private String hdfsBasePath;
20

    
21
	private String ruleId;
22

    
23
	@Autowired
24
	private UniqueServiceLocator serviceLocator;
25

    
26
	@Override
27
	protected String execute(final NodeToken token) throws Exception {
28

    
29
		final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
30
		final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
31

    
32
		final String writing_mdId = token.getEnv().getAttribute("mdId");
33
		final String writing_versionId = token.getEnv().getAttribute("versionId");
34

    
35
		final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId);
36
		final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId);
37

    
38

    
39
		final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId());
40

    
41
		token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis());
42
		token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId"));
43
		token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath);
44
		token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath);
45
		token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule));
46
		return Arc.DEFAULT_ARC;
47

    
48
	}
49

    
50
	public String getHdfsBasePath() {
51
		return hdfsBasePath;
52
	}
53

    
54
	public void setHdfsBasePath(final String hdfsBasePath) {
55
		this.hdfsBasePath = hdfsBasePath;
56
	}
57

    
58
	public String getRuleId() {
59
		return ruleId;
60
	}
61

    
62
	public void setRuleId(String ruleId) {
63
		this.ruleId = ruleId;
64
	}
65

    
66
	public UniqueServiceLocator getServiceLocator() {
67
		return serviceLocator;
68
	}
69

    
70
	public void setServiceLocator(UniqueServiceLocator serviceLocator) {
71
		this.serviceLocator = serviceLocator;
72
	}
73
}
(8-8/11)