Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.util.Map;
3
import java.util.Optional;
4
import java.util.stream.Collectors;
5

    
6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Autowired;
11

    
12
import com.google.gson.Gson;
13
import com.googlecode.sarasvati.Arc;
14
import com.googlecode.sarasvati.NodeToken;
15

    
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
20
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
21

    
22
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
23

    
24
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
25

    
26
    @Autowired
27
    private LocalDatasourceManager<?, ?> dsManager;
28

    
29
    private String hdfsBasePath;
30

    
31
    @Override
32
    protected String execute(final NodeToken token) throws Exception {
33

    
34
        // param 1 : hdfs path
35
        // param 2 : api descriptor (json)
36
        // param 3 : nameNode
37

    
38
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
39
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
40
        final String mdId = token.getEnv().getAttribute("mdId");
41
        final String versionId = token.getEnv().getAttribute("versionId");
42

    
43
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
44
                .stream()
45
                .filter(a -> a.getId().equals(apiId))
46
                .map(a -> {
47
                    final ApiDescriptor res = new ApiDescriptor();
48
                    res.setBaseUrl(a.getBaseurl());
49
                    res.setId(a.getId());
50
                    res.setProtocol(a.getProtocol());
51
                    res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
52
                    res.getParams().putAll(a.getApiParams()
53
                            .stream()
54
                            .map(o -> (ApiParam) o)
55
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
56
                    return res;
57
                })
58
                .findFirst();
59

    
60
        if (opt.isPresent()) {
61
            final ApiDescriptor api = opt.get();
62
            final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId);
63
            final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId);
64
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
65
            token.getEnv().setAttribute("mdStorePath", hdfsPath);
66
            token.getEnv().setAttribute("sequenceFilePath", seqFilePath);
67
            final Provenance provenance = new Provenance();
68
            provenance.setDatasourceId(dsId);
69
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
70
            provenance.setDatasourceName(ds.getOfficialname());
71
            provenance.setNsPrefix(ds.getNamespaceprefix());
72
            token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance));
73
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
74
            token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
75
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
76

    
77
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
78
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
79
            final Map<String, String> params = api.getParams();
80
            if (params != null) {
81
                for(Map.Entry<String, String> e : params.entrySet()) {
82
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
83
                }
84
            }
85

    
86
            return Arc.DEFAULT_ARC;
87
        } else {
88
            return "abort";
89
        }
90

    
91
    }
92

    
93
    public String getHdfsBasePath() {
94
        return hdfsBasePath;
95
    }
96

    
97
    public void setHdfsBasePath(String hdfsBasePath) {
98
        this.hdfsBasePath = hdfsBasePath;
99
    }
100
}
(7-7/10)