Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import java.text.SimpleDateFormat;
4
import java.util.Date;
5
import java.util.Iterator;
6
import java.util.Map;
7
import java.util.Optional;
8
import java.util.stream.Collectors;
9

    
10
import eu.dnetlib.common.logging.DnetLogger;
11
import eu.dnetlib.dhp.collection.ApiDescriptor;
12
import eu.dnetlib.dhp.model.mdstore.Provenance;
13
import eu.dnetlib.enabling.datasources.common.Datasource;
14
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
15
import eu.dnetlib.msro.rmi.MSROException;
16
import org.apache.commons.lang.math.NumberUtils;
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.springframework.beans.factory.annotation.Autowired;
21

    
22
import com.google.gson.Gson;
23
import com.googlecode.sarasvati.Arc;
24
import com.googlecode.sarasvati.NodeToken;
25

    
26
import eu.dnetlib.enabling.datasources.common.ApiParam;
27
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
28
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
29
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
30
import org.springframework.beans.factory.annotation.Value;
31

    
32
import javax.annotation.Resource;
33

    
34
import static eu.dnetlib.dhp.common.Constants.*;
35

    
36
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
37

    
38
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
39

    
40
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
41
    public static final String DATE_FORMAT = "yyyy-MM-dd";
42

    
43
    private long ONE_DAY = 1000 * 60 * 60 * 24;
44

    
45
    @Resource(name = "msroWorkflowLogger")
46
    private DnetLogger dnetLogger;
47

    
48
    @Autowired
49
    private LocalDatasourceManager<?, ?> dsManager;
50

    
51
    /**
52
     * MDStore identifier
53
     */
54
    private String mdId;
55

    
56
    /**
57
     * REFRESH | INCREMENTAL
58
     */
59
    private String collectionMode;
60

    
61
    /**
62
     * used in INCREMENTAL mode
63
     */
64
    private String fromDateOverride;
65

    
66
    /**
67
     * used in to set the untilDate
68
     */
69
    private String untilDateOverride;
70

    
71
    /**
72
     * XML | JSON, default = XML
73
     */
74
    private String metadataEncoding = "XML";
75

    
76
    /**
77
     * Maximum number of allowed retires before failing
78
     */
79
    private int maxNumberOfRetry = 5;
80

    
81
    /**
82
     * Delay between request (Milliseconds)
83
     */
84
    private int requestDelay = 0;
85

    
86
    /**
87
     * Time to wait after a failure before retrying (Seconds)
88
     */
89
    private int retryDelay = 60;
90

    
91
    /**
92
     * Connect timeout (Seconds)
93
     */
94
    private int connectTimeOut = 30;
95

    
96
    /**
97
     * Read timeout (Seconds)
98
     */
99
    private int readTimeOut = 60;
100

    
101
    @Value("${dnet.openaire.dhp.collection.app.path}")
102
    private String oozieWfPath;
103

    
104
    @Value("${dnet.openaire.dhp.dnetMessageManagerURL}")
105
    private String dnetMessageManagerURL;
106

    
107
    @Override
108
    protected String execute(final NodeToken token) throws Exception {
109

    
110
        final String dsId = token.getEnv().getAttribute("parentDatasourceId");
111
        log.info("dsId: " + dsId);
112
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
113
        log.info("apiId: " + apiId);
114

    
115
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
116
                .stream()
117
                .filter(a -> a.getId().equals(apiId))
118
                .map(a -> {
119
                    final ApiDescriptor res = new ApiDescriptor();
120
                    res.setBaseUrl(a.getBaseurl());
121
                    res.setId(a.getId());
122
                    res.setProtocol(a.getProtocol());
123
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
124
                    res.getParams().putAll(a.getApiParams()
125
                            .stream()
126
                            .map(o -> (ApiParam) o)
127
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
128
                    return res;
129
                })
130
                .findFirst();
131

    
132
        if (opt.isPresent()) {
133
            token.getEnv().setAttribute("mdId", getMdId());
134
            token.getEnv().setAttribute(COLLECTION_MODE, getCollectionMode());
135
            token.getEnv().setAttribute(METADATA_ENCODING, getMetadataEncoding());
136
            token.getEnv().setAttribute(OOZIE_WF_PATH, getOozieWfPath());
137
            token.getEnv().setAttribute(DNET_MESSAGE_MGR_URL, getDnetMessageManagerURL());
138

    
139
            token.getEnv().setAttribute(MAX_NUMBER_OF_RETRY, getMaxNumberOfRetry());
140
            token.getEnv().setAttribute(REQUEST_DELAY, getRequestDelay());
141
            token.getEnv().setAttribute(RETRY_DELAY, getRetryDelay());
142
            token.getEnv().setAttribute(CONNECT_TIMEOUT, getConnectTimeOut());
143
            token.getEnv().setAttribute(READ_TIMEOUT, getReadTimeOut());
144

    
145
            final ApiDescriptor api = opt.get();
146
            if ("INCREMENTAL".equals(getCollectionMode())) {
147
                final String fromDate = calculateFromDate(token);
148

    
149
                log.info("Incremental Harvesting from: " + fromDate);
150

    
151
                if (StringUtils.isNotBlank(fromDate)) {
152
                   api.getParams().put("fromDate", fromDate);
153
                }
154
            }
155

    
156
            if (StringUtils.isNotBlank(getUntilDateOverride())) {
157
                api.getParams().put("untilDate", getUntilDateOverride());
158
            }
159

    
160
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
161

    
162
            final Provenance provenance = new Provenance();
163
            provenance.setDatasourceId(dsId);
164
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
165
            provenance.setDatasourceName(ds.getOfficialname());
166
            provenance.setNsPrefix(ds.getNamespaceprefix());
167
            final String dsProvenance = new Gson().toJson(provenance);
168
            log.info("datasource provenance: " + dsProvenance);
169

    
170
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
171
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
172
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
173
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
174

    
175
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
176
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
177
            final Map<String, String> params = api.getParams();
178
            if (params != null) {
179
                for(Map.Entry<String, String> e : params.entrySet()) {
180
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
181
                }
182
            }
183

    
184
            return Arc.DEFAULT_ARC;
185
        } else {
186
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
187
        }
188
    }
189

    
190
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
191
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
192
        if (p1 != null && !p1.isEmpty()) {
193
            return p1;
194
        }
195
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
196
        if (p2 != null && !p2.isEmpty()) {
197
            return p2;
198
        }
199
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
200
        if (p3 != null && !p3.isEmpty()) {
201
            return p3;
202
        }
203
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
204
    }
205

    
206
    private String calculateFromDate(final NodeToken token) throws MSROException {
207

    
208
        if (StringUtils.isNotBlank(getFromDateOverride())) {
209
            log.info("using override FROM_DATE for incremental harvesting: " + getFromDateOverride());
210
            return getFromDateOverride();
211
        }
212
        final String profId = findCurrentWfProfileId(token);
213

    
214
        final long d = findLastSuccessStartDate(profId);
215
        return (d > 0) ? (new SimpleDateFormat(DATE_FORMAT)).format(new Date(d - ONE_DAY)) : null;
216
    }
217

    
218
    private long findLastSuccessStartDate(String profId) {
219
        long res = -1;
220

    
221
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
222
        while (iter.hasNext()) {
223
            final Map<String, String> map = iter.next();
224
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
225
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_START_DATE), -1);
226
                if (curr > res) {
227
                    res = curr;
228
                }
229
            }
230
        }
231
        return res;
232
    }
233

    
234

    
235
    public String getMdId() {
236
        return mdId;
237
    }
238

    
239
    public void setMdId(String mdId) {
240
        this.mdId = mdId;
241
    }
242

    
243
    public String getCollectionMode() {
244
        return collectionMode;
245
    }
246

    
247
    public void setCollectionMode(String collectionMode) {
248
        this.collectionMode = collectionMode;
249
    }
250

    
251
    public String getOozieWfPath() {
252
        return oozieWfPath;
253
    }
254

    
255
    public void setOozieWfPath(String oozieWfPath) {
256
        this.oozieWfPath = oozieWfPath;
257
    }
258

    
259
    public String getMetadataEncoding() {
260
        return metadataEncoding;
261
    }
262

    
263
    public void setMetadataEncoding(String metadataEncoding) {
264
        this.metadataEncoding = metadataEncoding;
265
    }
266

    
267
    public String getFromDateOverride() {
268
        return fromDateOverride;
269
    }
270

    
271
    public void setFromDateOverride(String fromDateOverride) {
272
        this.fromDateOverride = fromDateOverride;
273
    }
274

    
275
    public String getUntilDateOverride() {
276
        return untilDateOverride;
277
    }
278

    
279
    public void setUntilDateOverride(String untilDateOverride) {
280
        this.untilDateOverride = untilDateOverride;
281
    }
282

    
283
    public int getMaxNumberOfRetry() {
284
        return maxNumberOfRetry;
285
    }
286

    
287
    public void setMaxNumberOfRetry(int maxNumberOfRetry) {
288
        this.maxNumberOfRetry = maxNumberOfRetry;
289
    }
290

    
291
    public int getRequestDelay() {
292
        return requestDelay;
293
    }
294

    
295
    public void setRequestDelay(int requestDelay) {
296
        this.requestDelay = requestDelay;
297
    }
298

    
299
    public int getRetryDelay() {
300
        return retryDelay;
301
    }
302

    
303
    public void setRetryDelay(int retryDelay) {
304
        this.retryDelay = retryDelay;
305
    }
306

    
307
    public int getConnectTimeOut() {
308
        return connectTimeOut;
309
    }
310

    
311
    public void setConnectTimeOut(int connectTimeOut) {
312
        this.connectTimeOut = connectTimeOut;
313
    }
314

    
315
    public int getReadTimeOut() {
316
        return readTimeOut;
317
    }
318

    
319
    public void setReadTimeOut(int readTimeOut) {
320
        this.readTimeOut = readTimeOut;
321
    }
322

    
323
    public String getDnetMessageManagerURL() {
324
        return dnetMessageManagerURL;
325
    }
326

    
327
    public void setDnetMessageManagerURL(String dnetMessageManagerURL) {
328
        this.dnetMessageManagerURL = dnetMessageManagerURL;
329
    }
330

    
331
}
(7-7/11)