Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.util.Map;
3
import java.util.Optional;
4
import java.util.stream.Collectors;
5

    
6
import eu.dnetlib.dhp.model.mdstore.Provenance;
7
import eu.dnetlib.enabling.datasources.common.Datasource;
8
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Autowired;
12

    
13
import com.google.gson.Gson;
14
import com.googlecode.sarasvati.Arc;
15
import com.googlecode.sarasvati.NodeToken;
16

    
17
import eu.dnetlib.collector.worker.model.ApiDescriptor;
18
import eu.dnetlib.enabling.datasources.common.ApiParam;
19
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22
import org.springframework.beans.factory.annotation.Value;
23

    
24
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
25

    
26
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
27
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
28

    
29
    @Autowired
30
    private LocalDatasourceManager<?, ?> dsManager;
31

    
32
    /**
33
     * MDStore identifier
34
     */
35
    private String mdId;
36

    
37
    /**
38
     * REFRESH | INCREMENTAL
39
     */
40
    private String collectionMode;
41

    
42
    @Value("${dnet.openaire.dhp.collection.app.path}")
43
    private String oozieWfPath;
44

    
45
    @Override
46
    protected String execute(final NodeToken token) throws Exception {
47

    
48
        // param 1 : hdfs path
49
        // param 2 : api descriptor (json)
50
        // param 3 : nameNode
51

    
52
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
53
        log.info("dsId: " + dsId);
54
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
55
        log.info("apiId: " + apiId);
56

    
57
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
58
                .stream()
59
                .filter(a -> a.getId().equals(apiId))
60
                .map(a -> {
61
                    final ApiDescriptor res = new ApiDescriptor();
62
                    res.setBaseUrl(a.getBaseurl());
63
                    res.setId(a.getId());
64
                    res.setProtocol(a.getProtocol());
65
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
66
                    res.getParams().putAll(a.getApiParams()
67
                            .stream()
68
                            .map(o -> (ApiParam) o)
69
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
70
                    return res;
71
                })
72
                .findFirst();
73

    
74
        if (opt.isPresent()) {
75
            token.getEnv().setAttribute("mdId", getMdId());
76
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
77
            token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
78

    
79
            final ApiDescriptor api = opt.get();
80
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
81

    
82
            final Provenance provenance = new Provenance();
83
            provenance.setDatasourceId(dsId);
84
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
85
            provenance.setDatasourceName(ds.getOfficialname());
86
            provenance.setNsPrefix(ds.getNamespaceprefix());
87
            final String dsProvenance = new Gson().toJson(provenance);
88
            log.info("datasource provenance: " + dsProvenance);
89

    
90
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
91
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
92
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
93
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
94

    
95
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
96
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
97
            final Map<String, String> params = api.getParams();
98
            if (params != null) {
99
                for(Map.Entry<String, String> e : params.entrySet()) {
100
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
101
                }
102
            }
103

    
104
            return Arc.DEFAULT_ARC;
105
        } else {
106
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
107
        }
108
    }
109

    
110
    public String getMdId() {
111
        return mdId;
112
    }
113

    
114
    public void setMdId(String mdId) {
115
        this.mdId = mdId;
116
    }
117

    
118
    public String getCollectionMode() {
119
        return collectionMode;
120
    }
121

    
122
    public void setCollectionMode(String collectionMode) {
123
        this.collectionMode = collectionMode;
124
    }
125

    
126
    public String getOozieWfPath() {
127
        return oozieWfPath;
128
    }
129

    
130
    public void setOozieWfPath(String oozieWfPath) {
131
        this.oozieWfPath = oozieWfPath;
132
    }
133
}
(7-7/11)