Project

General

Profile

1 57299 sandro.lab
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.util.Map;
3
import java.util.Optional;
4
import java.util.stream.Collectors;
5
6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7
import eu.dnetlib.enabling.datasources.common.Datasource;
8 60316 claudio.at
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
9 57299 sandro.lab
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Autowired;
12
13
import com.google.gson.Gson;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16
17
import eu.dnetlib.collector.worker.model.ApiDescriptor;
18
import eu.dnetlib.enabling.datasources.common.ApiParam;
19
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22 60318 claudio.at
import org.springframework.beans.factory.annotation.Value;
23 57299 sandro.lab
24
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
25
26
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
27 60316 claudio.at
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
28 57299 sandro.lab
29
    @Autowired
30
    private LocalDatasourceManager<?, ?> dsManager;
31
32 60316 claudio.at
    /**
33
     * MDStore identifier
34
     */
35
    private String mdId;
36 57299 sandro.lab
37 60316 claudio.at
    /**
38
     * REFRESH | INCREMENTAL
39
     */
40
    private String collectionMode;
41
42 60321 claudio.at
    /**
43
     * XML | JSON, default = XML
44
     */
45
    private String metadataEncoding = "XML";
46
47 60318 claudio.at
    @Value("${dnet.openaire.dhp.collection.app.path}")
48
    private String oozieWfPath;
49
50 57299 sandro.lab
    @Override
51
    protected String execute(final NodeToken token) throws Exception {
52
53
        // param 1 : hdfs path
54
        // param 2 : api descriptor (json)
55
        // param 3 : nameNode
56
57
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
58 60316 claudio.at
        log.info("dsId: " + dsId);
59 57299 sandro.lab
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
60 60316 claudio.at
        log.info("apiId: " + apiId);
61 57299 sandro.lab
62
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
63
                .stream()
64
                .filter(a -> a.getId().equals(apiId))
65
                .map(a -> {
66
                    final ApiDescriptor res = new ApiDescriptor();
67
                    res.setBaseUrl(a.getBaseurl());
68
                    res.setId(a.getId());
69
                    res.setProtocol(a.getProtocol());
70 60316 claudio.at
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
71 57299 sandro.lab
                    res.getParams().putAll(a.getApiParams()
72
                            .stream()
73
                            .map(o -> (ApiParam) o)
74
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
75
                    return res;
76
                })
77
                .findFirst();
78
79
        if (opt.isPresent()) {
80 60316 claudio.at
            token.getEnv().setAttribute("mdId", getMdId());
81
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
82 60321 claudio.at
            token.getEnv().setAttribute("metadataEncoding", getMetadataEncoding());
83 60318 claudio.at
            token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
84 60316 claudio.at
85 57299 sandro.lab
            final ApiDescriptor api = opt.get();
86
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
87 60316 claudio.at
88 57299 sandro.lab
            final Provenance provenance = new Provenance();
89
            provenance.setDatasourceId(dsId);
90
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
91
            provenance.setDatasourceName(ds.getOfficialname());
92
            provenance.setNsPrefix(ds.getNamespaceprefix());
93 60316 claudio.at
            final String dsProvenance = new Gson().toJson(provenance);
94
            log.info("datasource provenance: " + dsProvenance);
95
96
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
97 57299 sandro.lab
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
98 60316 claudio.at
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
99 57299 sandro.lab
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
100
101
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
102
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
103
            final Map<String, String> params = api.getParams();
104
            if (params != null) {
105
                for(Map.Entry<String, String> e : params.entrySet()) {
106
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
107
                }
108
            }
109
110
            return Arc.DEFAULT_ARC;
111
        } else {
112 60316 claudio.at
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
113 57299 sandro.lab
        }
114 60316 claudio.at
    }
115 57299 sandro.lab
116 60316 claudio.at
    public String getMdId() {
117
        return mdId;
118 57299 sandro.lab
    }
119
120 60316 claudio.at
    public void setMdId(String mdId) {
121
        this.mdId = mdId;
122 57299 sandro.lab
    }
123
124 60316 claudio.at
    public String getCollectionMode() {
125
        return collectionMode;
126 57299 sandro.lab
    }
127 60316 claudio.at
128
    public void setCollectionMode(String collectionMode) {
129
        this.collectionMode = collectionMode;
130
    }
131 60318 claudio.at
132
    public String getOozieWfPath() {
133
        return oozieWfPath;
134
    }
135
136
    public void setOozieWfPath(String oozieWfPath) {
137
        this.oozieWfPath = oozieWfPath;
138
    }
139 60321 claudio.at
140
    public String getMetadataEncoding() {
141
        return metadataEncoding;
142
    }
143
144
    public void setMetadataEncoding(String metadataEncoding) {
145
        this.metadataEncoding = metadataEncoding;
146
    }
147 57299 sandro.lab
}