Project

General

Profile

« Previous | Next » 

Revision 57333

mereged transformation Job spark

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvTransformHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

  
3
import eu.dnetlib.dhp.utils.DHPUtils;
4
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
5
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.NodeToken;
11

  
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import org.springframework.beans.factory.annotation.Autowired;
14

  
15
public class PrepareEnvTransformHadoopJobNode extends SimpleJobNode {
16

  
17
    private static final Log log = LogFactory.getLog(PrepareEnvTransformHadoopJobNode.class);
18

  
19
    private String hdfsBasePath;
20

  
21
    private String ruleId;
22

  
23
    @Autowired
24
    private UniqueServiceLocator serviceLocator;
25

  
26
    @Override
27
    protected String execute(final NodeToken token) throws Exception {
28

  
29
        final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
30
        final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
31

  
32
        final String writing_mdId = token.getEnv().getAttribute("mdId");
33
        final String writing_versionId = token.getEnv().getAttribute("versionId");
34

  
35
        final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId);
36
        final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId);
37

  
38

  
39
        final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId());
40

  
41
        token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis());
42
        token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId"));
43
        token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath);
44
        token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath);
45
        token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule));
46
        return Arc.DEFAULT_ARC;
47

  
48
    }
49

  
50
    public String getHdfsBasePath() {
51
        return hdfsBasePath;
52
    }
53

  
54
    public void setHdfsBasePath(final String hdfsBasePath) {
55
        this.hdfsBasePath = hdfsBasePath;
56
    }
57

  
58
    public String getRuleId() {
59
        return ruleId;
60
    }
61

  
62
    public void setRuleId(String ruleId) {
63
        this.ruleId = ruleId;
64
    }
65

  
66
    public UniqueServiceLocator getServiceLocator() {
67
        return serviceLocator;
68
    }
69

  
70
    public void setServiceLocator(UniqueServiceLocator serviceLocator) {
71
        this.serviceLocator = serviceLocator;
72
    }
73
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
410 410
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.EndReadingMDStoreJobNode"
411 411
          p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" />
412 412

  
413
    <bean id="wfNodePrepareEnvTransformHadoopJobNode"
414
          class="eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.PrepareEnvTransformHadoopJobNode"
415
          scope="prototype" />
413 416

  
414 417

  
415 418

  
419

  
416 420
    <!-- Aggreagation with Hadoop
417 421

  
418 422
    <bean id="wfNodePrepareEnvTransformHadoopJobNode"

Also available in: Unified diff