Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import java.text.SimpleDateFormat;
4
import java.util.Date;
5
import java.util.Iterator;
6
import java.util.Map;
7
import java.util.Optional;
8
import java.util.stream.Collectors;
9

    
10
import eu.dnetlib.common.logging.DnetLogger;
11
import eu.dnetlib.dhp.collection.ApiDescriptor;
12
import eu.dnetlib.dhp.model.mdstore.Provenance;
13
import eu.dnetlib.enabling.datasources.common.Datasource;
14
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException;
15
import eu.dnetlib.msro.rmi.MSROException;
16
import org.apache.commons.lang.math.NumberUtils;
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.springframework.beans.factory.annotation.Autowired;
21

    
22
import com.google.gson.Gson;
23
import com.googlecode.sarasvati.Arc;
24
import com.googlecode.sarasvati.NodeToken;
25

    
26
import eu.dnetlib.enabling.datasources.common.ApiParam;
27
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
28
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
29
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
30
import org.springframework.beans.factory.annotation.Value;
31

    
32
import javax.annotation.Resource;
33

    
34
import static eu.dnetlib.dhp.common.Constants.*;
35

    
36
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode {
37

    
38
    private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class);
39

    
40
    public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path";
41
    public static final String DATE_FORMAT = "yyyy-MM-dd";
42

    
43
    private long ONE_DAY = 1000 * 60 * 60 * 24;
44

    
45
    @Resource(name = "msroWorkflowLogger")
46
    private DnetLogger dnetLogger;
47

    
48
    @Autowired
49
    private LocalDatasourceManager<?, ?> dsManager;
50

    
51
    /**
52
     * MDStore identifier
53
     */
54
    private String mdId;
55

    
56
    /**
57
     * REFRESH | INCREMENTAL
58
     */
59
    private String collectionMode;
60

    
61
    /**
62
     * used in INCREMENTAL mode
63
     */
64
    private String fromDateOverride;
65

    
66
    /**
67
     * XML | JSON, default = XML
68
     */
69
    private String metadataEncoding = "XML";
70

    
71
    /**
72
     * Maximum number of allowed retires before failing
73
     */
74
    private int maxNumberOfRetry = 5;
75

    
76
    /**
77
     * Delay between request (Milliseconds)
78
     */
79
    private int requestDelay = 0;
80

    
81
    /**
82
     * Time to wait after a failure before retrying (Seconds)
83
     */
84
    private int retryDelay = 60;
85

    
86
    /**
87
     * Connect timeout (Seconds)
88
     */
89
    private int connectTimeOut = 30;
90

    
91
    /**
92
     * Read timeout (Seconds)
93
     */
94
    private int readTimeOut = 60;
95

    
96
    @Value("${dnet.openaire.dhp.collection.app.path}")
97
    private String oozieWfPath;
98

    
99
    @Value("${dnet.openaire.dhp.dnetMessageManagerURL}")
100
    private String dnetMessageManagerURL;
101

    
102
    @Override
103
    protected String execute(final NodeToken token) throws Exception {
104

    
105
        final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
106
        log.info("dsId: " + dsId);
107
        final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
108
        log.info("apiId: " + apiId);
109

    
110
        final Optional<ApiDescriptor> opt = dsManager.getApis(dsId)
111
                .stream()
112
                .filter(a -> a.getId().equals(apiId))
113
                .map(a -> {
114
                    final ApiDescriptor res = new ApiDescriptor();
115
                    res.setBaseUrl(a.getBaseurl());
116
                    res.setId(a.getId());
117
                    res.setProtocol(a.getProtocol());
118
                    res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
119
                    res.getParams().putAll(a.getApiParams()
120
                            .stream()
121
                            .map(o -> (ApiParam) o)
122
                            .collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
123
                    return res;
124
                })
125
                .findFirst();
126

    
127
        if (opt.isPresent()) {
128
            token.getEnv().setAttribute("mdId", getMdId());
129
            token.getEnv().setAttribute(COLLECTION_MODE, getCollectionMode());
130
            token.getEnv().setAttribute(METADATA_ENCODING, getMetadataEncoding());
131
            token.getEnv().setAttribute(OOZIE_WF_PATH, getOozieWfPath());
132
            token.getEnv().setAttribute(DNET_MESSAGE_MGR_URL, getDnetMessageManagerURL());
133

    
134
            token.getEnv().setAttribute(MAX_NUMBER_OF_RETRY, getMaxNumberOfRetry());
135
            token.getEnv().setAttribute(REQUEST_DELAY, getRequestDelay());
136
            token.getEnv().setAttribute(RETRY_DELAY, getRetryDelay());
137
            token.getEnv().setAttribute(CONNECT_TIMEOUT, getConnectTimeOut());
138
            token.getEnv().setAttribute(READ_TIMEOUT, getReadTimeOut());
139

    
140
            final ApiDescriptor api = opt.get();
141
            if ("INCREMENTAL".equals(getCollectionMode())) {
142
                final String fromDate = calculateFromDate(token);
143

    
144
                log.info("Incremental Harvesting from: " + fromDate);
145

    
146
                if (StringUtils.isNotBlank(fromDate)) {
147
                   api.getParams().put("fromDate", fromDate);
148
                }
149
            }
150

    
151
            token.getEnv().setAttribute("apiDescription", new Gson().toJson(api));
152

    
153
            final Provenance provenance = new Provenance();
154
            provenance.setDatasourceId(dsId);
155
            final Datasource<?, ?> ds = dsManager.getDs(dsId);
156
            provenance.setDatasourceName(ds.getOfficialname());
157
            provenance.setNsPrefix(ds.getNamespaceprefix());
158
            final String dsProvenance = new Gson().toJson(provenance);
159
            log.info("datasource provenance: " + dsProvenance);
160

    
161
            token.getEnv().setAttribute("dataSourceInfo", dsProvenance);
162
            token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis());
163
            token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
164
            token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId"));
165

    
166
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl());
167
            token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + "protocol", api.getProtocol());
168
            final Map<String, String> params = api.getParams();
169
            if (params != null) {
170
                for(Map.Entry<String, String> e : params.entrySet()) {
171
                    token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_PREFIX + e.getKey(), e.getValue());
172
                }
173
            }
174

    
175
            return Arc.DEFAULT_ARC;
176
        } else {
177
            throw new DsmNotFoundException("cannot find ds interface: " + apiId);
178
        }
179
    }
180

    
181
    private String findCurrentWfProfileId(NodeToken token) throws MSROException {
182
        final String p1 = token.getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
183
        if (p1 != null && !p1.isEmpty()) {
184
            return p1;
185
        }
186
        final String p2 = token.getFullEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
187
        if (p2 != null && !p2.isEmpty()) {
188
            return p2;
189
        }
190
        final String p3 = token.getProcess().getEnv().getAttribute(WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
191
        if (p3 != null && !p3.isEmpty()) {
192
            return p3;
193
        }
194
        throw new MSROException("Missing property in env: " + WorkflowsConstants.SYSTEM_WF_PROFILE_ID);
195
    }
196

    
197
    private String calculateFromDate(final NodeToken token) throws MSROException {
198

    
199
        if (StringUtils.isNotBlank(getFromDateOverride())) {
200
            log.info("using override FROM_DATE for incremental harvesting: " + getFromDateOverride());
201
            return getFromDateOverride();
202
        }
203
        final String profId = findCurrentWfProfileId(token);
204

    
205
        final long d = findLastSuccessStartDate(profId);
206
        return (d > 0) ? (new SimpleDateFormat(DATE_FORMAT)).format(new Date(d - ONE_DAY)) : null;
207
    }
208

    
209
    private long findLastSuccessStartDate(String profId) {
210
        long res = -1;
211

    
212
        final Iterator<Map<String, String>> iter = dnetLogger.find(WorkflowsConstants.SYSTEM_WF_PROFILE_ID, profId);
213
        while (iter.hasNext()) {
214
            final Map<String, String> map = iter.next();
215
            if ("true".equalsIgnoreCase(map.get(WorkflowsConstants.SYSTEM_COMPLETED_SUCCESSFULLY))) {
216
                final long curr = NumberUtils.toLong(map.get(WorkflowsConstants.SYSTEM_START_DATE), -1);
217
                if (curr > res) {
218
                    res = curr;
219
                }
220
            }
221
        }
222
        return res;
223
    }
224

    
225

    
226
    public String getMdId() {
227
        return mdId;
228
    }
229

    
230
    public void setMdId(String mdId) {
231
        this.mdId = mdId;
232
    }
233

    
234
    public String getCollectionMode() {
235
        return collectionMode;
236
    }
237

    
238
    public void setCollectionMode(String collectionMode) {
239
        this.collectionMode = collectionMode;
240
    }
241

    
242
    public String getOozieWfPath() {
243
        return oozieWfPath;
244
    }
245

    
246
    public void setOozieWfPath(String oozieWfPath) {
247
        this.oozieWfPath = oozieWfPath;
248
    }
249

    
250
    public String getMetadataEncoding() {
251
        return metadataEncoding;
252
    }
253

    
254
    public void setMetadataEncoding(String metadataEncoding) {
255
        this.metadataEncoding = metadataEncoding;
256
    }
257

    
258
    public String getFromDateOverride() {
259
        return fromDateOverride;
260
    }
261

    
262
    public void setFromDateOverride(String fromDateOverride) {
263
        this.fromDateOverride = fromDateOverride;
264
    }
265

    
266
    public int getMaxNumberOfRetry() {
267
        return maxNumberOfRetry;
268
    }
269

    
270
    public void setMaxNumberOfRetry(int maxNumberOfRetry) {
271
        this.maxNumberOfRetry = maxNumberOfRetry;
272
    }
273

    
274
    public int getRequestDelay() {
275
        return requestDelay;
276
    }
277

    
278
    public void setRequestDelay(int requestDelay) {
279
        this.requestDelay = requestDelay;
280
    }
281

    
282
    public int getRetryDelay() {
283
        return retryDelay;
284
    }
285

    
286
    public void setRetryDelay(int retryDelay) {
287
        this.retryDelay = retryDelay;
288
    }
289

    
290
    public int getConnectTimeOut() {
291
        return connectTimeOut;
292
    }
293

    
294
    public void setConnectTimeOut(int connectTimeOut) {
295
        this.connectTimeOut = connectTimeOut;
296
    }
297

    
298
    public int getReadTimeOut() {
299
        return readTimeOut;
300
    }
301

    
302
    public void setReadTimeOut(int readTimeOut) {
303
        this.readTimeOut = readTimeOut;
304
    }
305

    
306
    public String getDnetMessageManagerURL() {
307
        return dnetMessageManagerURL;
308
    }
309

    
310
    public void setDnetMessageManagerURL(String dnetMessageManagerURL) {
311
        this.dnetMessageManagerURL = dnetMessageManagerURL;
312
    }
313

    
314
}
(7-7/11)