Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
import java.text.SimpleDateFormat;
3
import java.util.Date;
4
import java.util.Iterator;
5
import java.util.Map;
6
import java.util.Optional;
7
import java.util.stream.Collectors;
8

    
9
import eu.dnetlib.common.logging.DnetLogger;
10
import eu.dnetlib.dhp.collector.worker.model.ApiDescriptor;
11
import eu.dnetlib.dhp.model.mdstore.Provenance;
12
import eu.dnetlib.enabling.datasources.common.Datasource;
13
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
14
import eu.dnetlib.msro.rmi.MSROException;
15
import org.apache.commons.lang.math.NumberUtils;
16
import org.apache.commons.lang3.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.springframework.beans.factory.annotation.Autowired;
20

    
21
import com.google.gson.Gson;
22
import com.googlecode.sarasvati.Arc;
23
import com.googlecode.sarasvati.NodeToken;
24

    
25
import eu.dnetlib.enabling.datasources.common.ApiParam;
26
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
27
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
28
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
29
import org.springframework.beans.factory.annotation.Value;
30

    
31
import javax.annotation.Resource;
32

    
33
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
34

    
35
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
36

    
37
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
38
    public static final String DATE_FORMAT = "yyyy-MM-dd";
39

    
40
    private long ONE_DAY = 1000 * 60 * 60 * 24;
41

    
42
    @Resource(name = "msroWorkflowLogger")
43
    private DnetLogger dnetLogger;
44

    
45
    @Autowired
46
    private LocalDatasourceManager<?, ?> dsManager;
47

    
48
    /**
49
     * MDStore identifier
50
     */
51
    private String mdId;
52

    
53
    /**
54
     * REFRESH | INCREMENTAL
55
     */
56
    private String collectionMode;
57

    
58
    /**
59
     * used in INCREMENTAL mode
60
     */
61
    private String fromDateOverride;
62

    
63
    /**
64
     * XML | JSON, default = XML
65
     */
66
    private String metadataEncoding = "XML";
67

    
68
    @Value("${dnet.openaire.dhp.collection.app.path}")
69
    private String oozieWfPath;
70

    
71
    @Override
72
    protected String execute(final NodeToken token) throws Exception {
73

    
74
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
75
        log.info("dsId: " + dsId);
76
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
77
        log.info("apiId: " + apiId);
78

    
79
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
80
                .stream()
81
                .filter(a -> a.getId().equals(apiId))
82
                .map(a -> {
83
                    final ApiDescriptor res = new ApiDescriptor();
84
                    res.setBaseUrl(a.getBaseurl());
85
                    res.setId(a.getId());
86
                    res.setProtocol(a.getProtocol());
87
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
88
                    res.getParams().putAll(a.getApiParams()
89
                            .stream()
90
                            .map(o -> (ApiParam) o)
91
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
92
                    return res;
93
                })
94
                .findFirst();
95

    
96
        if (opt.isPresent()) {
97
            token.getEnv().setAttribute("mdId", getMdId());
98
            token.getEnv().setAttribute("collectionMode", getCollectionMode());
99
            token.getEnv().setAttribute("metadataEncoding", getMetadataEncoding());
100
            token.getEnv().setAttribute("oozieWfPath", getOozieWfPath());
101

    
102
            final ApiDescriptor api = opt.get();
103
            if ("INCREMENTAL".equals(getCollectionMode())) {
104
                final String fromDate = calculateFromDate(token);
105

    
106
                log.info("Incremental Harvesting from: " + fromDate);
107

    
108
                if (StringUtils.isNotBlank(fromDate)) {
109
                   api.getParams().put("fromDate", fromDate);
110
                }
111
            }
112

    
113
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
114

    
115
            final Provenance provenance = new Provenance();
116
            provenance.setDatasourceId(dsId);
117
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
118
            provenance.setDatasourceName(ds.getOfficialname());
119
            provenance.setNsPrefix(ds.getNamespaceprefix());
120
            final String dsProvenance = new Gson().toJson(provenance);
121
            log.info("datasource provenance: " + dsProvenance);
122

    
123
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
124
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
125
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
126
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
127

    
128
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
129
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
130
            final Map<String, String> params = api.getParams();
131
            if (params != null) {
132
                for(Map.Entry<String, String> e : params.entrySet()) {
133
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
134
                }
135
            }
136

    
137
            return Arc.DEFAULT_ARC;
138
        } else {
139
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
140
        }
141
    }
142

    
143
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
144
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
145
        if (p1 != null && !p1.isEmpty()) {
146
            return p1;
147
        }
148
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
149
        if (p2 != null && !p2.isEmpty()) {
150
            return p2;
151
        }
152
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
153
        if (p3 != null && !p3.isEmpty()) {
154
            return p3;
155
        }
156
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
157
    }
158

    
159
    private String calculateFromDate(final NodeToken token) throws MSROException {
160

    
161
        if (StringUtils.isNotBlank(getFromDateOverride())) {
162
            log.info("using override FROM_DATE for incremental harvesting: " + getFromDateOverride());
163
            return getFromDateOverride();
164
        }
165
        final String profId = findCurrentWfProfileId(token);
166

    
167
        final long d = findLastSuccessStartDate(profId);
168
        return (d > 0) ? (new SimpleDateFormat(DATE_FORMAT)).format(new Date(d - ONE_DAY)) : null;
169
    }
170

    
171
    private long findLastSuccessStartDate(String profId) {
172
        long res = -1;
173

    
174
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
175
        while (iter.hasNext()) {
176
            final Map<String, String> map = iter.next();
177
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
178
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_START_DATE), -1);
179
                if (curr > res) {
180
                    res = curr;
181
                }
182
            }
183
        }
184
        return res;
185
    }
186

    
187

    
188
    public String getMdId() {
189
        return mdId;
190
    }
191

    
192
    public void setMdId(String mdId) {
193
        this.mdId = mdId;
194
    }
195

    
196
    public String getCollectionMode() {
197
        return collectionMode;
198
    }
199

    
200
    public void setCollectionMode(String collectionMode) {
201
        this.collectionMode = collectionMode;
202
    }
203

    
204
    public String getOozieWfPath() {
205
        return oozieWfPath;
206
    }
207

    
208
    public void setOozieWfPath(String oozieWfPath) {
209
        this.oozieWfPath = oozieWfPath;
210
    }
211

    
212
    public String getMetadataEncoding() {
213
        return metadataEncoding;
214
    }
215

    
216
    public void setMetadataEncoding(String metadataEncoding) {
217
        this.metadataEncoding = metadataEncoding;
218
    }
219

    
220
    public String getFromDateOverride() {
221
        return fromDateOverride;
222
    }
223

    
224
    public void setFromDateOverride(String fromDateOverride) {
225
        this.fromDateOverride = fromDateOverride;
226
    }
227
}
(7-7/11)