Project

General

Profile

« Previous | Next » 

Revision 60316

updated hadoop-specific aggregation workflows

View differences:

PrepareEnvCollectHadoopJobNode.java
5 5

  
6 6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7 7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
8 9
import org.apache.commons.logging.Log;
9 10
import org.apache.commons.logging.LogFactory;
10 11
import org.springframework.beans.factory.annotation.Autowired;
......
22 23
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
23 24

  
24 25
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
26
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
25 27

  
26 28
    @Autowired
27 29
    private LocalDatasourceManager<?, ?> dsManager;
28 30

  
29
    private String hdfsBasePath;
31
    /**
32
     * MDStore identifier
33
     */
34
    private String mdId;
30 35

  
36
    /**
37
     * REFRESH | INCREMENTAL
38
     */
39
    private String collectionMode;
40

  
31 41
    @Override
32 42
    protected String execute(final NodeToken token) throws Exception {
33 43

  
......
36 46
        // param 3 : nameNode
37 47

  
38 48
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
49
        log.info("dsId: " + dsId);
39 50
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
40
        final String mdId = token.getEnv().getAttribute("mdId");
41
        final String versionId = token.getEnv().getAttribute("versionId");
51
        log.info("apiId: " + apiId);
42 52

  
43 53
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
44 54
                .stream()
......
48 58
                    res.setBaseUrl(a.getBaseurl());
49 59
                    res.setId(a.getId());
50 60
                    res.setProtocol(a.getProtocol());
51
                    res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
61
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
52 62
                    res.getParams().putAll(a.getApiParams()
53 63
                            .stream()
54 64
                            .map(o -> (ApiParam) o)
......
58 68
                .findFirst();
59 69

  
60 70
        if (opt.isPresent()) {
71
            token.getEnv().setAttribute("mdId", getMdId());
72
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
73

  
61 74
            final ApiDescriptor api = opt.get();
62
            final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId);
63
            final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId);
64 75
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
65
            token.getEnv().setAttribute("mdStorePath", hdfsPath);
66
            token.getEnv().setAttribute("sequenceFilePath", seqFilePath);
76

  
67 77
            final Provenance provenance = new Provenance();
68 78
            provenance.setDatasourceId(dsId);
69 79
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
70 80
            provenance.setDatasourceName(ds.getOfficialname());
71 81
            provenance.setNsPrefix(ds.getNamespaceprefix());
72
            token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance));
82
            final String dsProvenance = new Gson().toJson(provenance);
83
            log.info("datasource provenance: " + dsProvenance);
84

  
85
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
73 86
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
74
            token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
87
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
75 88
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
76 89

  
77 90
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
......
85 98

  
86 99
            return Arc.DEFAULT_ARC;
87 100
        } else {
88
            return "abort";
101
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
89 102
        }
103
    }
90 104

  
105
    public String getMdId() {
106
        return mdId;
91 107
    }
92 108

  
93
    public String getHdfsBasePath() {
94
        return hdfsBasePath;
109
    public void setMdId(String mdId) {
110
        this.mdId = mdId;
95 111
    }
96 112

  
97
    public void setHdfsBasePath(String hdfsBasePath) {
98
        this.hdfsBasePath = hdfsBasePath;
113
    public String getCollectionMode() {
114
        return collectionMode;
99 115
    }
116

  
117
    public void setCollectionMode(String collectionMode) {
118
        this.collectionMode = collectionMode;
119
    }
100 120
}

Also available in: Unified diff