Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.util.Map;
3
import java.util.Optional;
4
import java.util.stream.Collectors;
5

    
6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Autowired;
12

    
13
import com.google.gson.Gson;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

    
17
import eu.dnetlib.collector.worker.model.ApiDescriptor;
18
import eu.dnetlib.enabling.datasources.common.ApiParam;
19
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22

    
23
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
24

    
25
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
26
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
27

    
28
    @Autowired
29
    private LocalDatasourceManager<?, ?> dsManager;
30

    
31
    /**
32
     * MDStore identifier
33
     */
34
    private String mdId;
35

    
36
    /**
37
     * REFRESH | INCREMENTAL
38
     */
39
    private String collectionMode;
40

    
41
    @Override
42
    protected String execute(final NodeToken token) throws Exception {
43

    
44
        // param 1 : hdfs path
45
        // param 2 : api descriptor (json)
46
        // param 3 : nameNode
47

    
48
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
49
        log.info("dsId: " + dsId);
50
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
51
        log.info("apiId: " + apiId);
52

    
53
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
54
                .stream()
55
                .filter(a -> a.getId().equals(apiId))
56
                .map(a -> {
57
                    final ApiDescriptor res = new ApiDescriptor();
58
                    res.setBaseUrl(a.getBaseurl());
59
                    res.setId(a.getId());
60
                    res.setProtocol(a.getProtocol());
61
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
62
                    res.getParams().putAll(a.getApiParams()
63
                            .stream()
64
                            .map(o -> (ApiParam) o)
65
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
66
                    return res;
67
                })
68
                .findFirst();
69

    
70
        if (opt.isPresent()) {
71
            token.getEnv().setAttribute("mdId", getMdId());
72
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
73

    
74
            final ApiDescriptor api = opt.get();
75
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
76

    
77
            final Provenance provenance = new Provenance();
78
            provenance.setDatasourceId(dsId);
79
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
80
            provenance.setDatasourceName(ds.getOfficialname());
81
            provenance.setNsPrefix(ds.getNamespaceprefix());
82
            final String dsProvenance = new Gson().toJson(provenance);
83
            log.info("datasource provenance: " + dsProvenance);
84

    
85
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
86
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
87
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
88
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
89

    
90
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
91
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
92
            final Map<String, String> params = api.getParams();
93
            if (params != null) {
94
                for(Map.Entry<String, String> e : params.entrySet()) {
95
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
96
                }
97
            }
98

    
99
            return Arc.DEFAULT_ARC;
100
        } else {
101
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
102
        }
103
    }
104

    
105
    public String getMdId() {
106
        return mdId;
107
    }
108

    
109
    public void setMdId(String mdId) {
110
        this.mdId = mdId;
111
    }
112

    
113
    public String getCollectionMode() {
114
        return collectionMode;
115
    }
116

    
117
    public void setCollectionMode(String collectionMode) {
118
        this.collectionMode = collectionMode;
119
    }
120
}
(7-7/11)