Project

General

Profile

1
/**
2
 * log.debug(...) equal to  log.trace(...) in the application-logs
3
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7

    
8
import java.io.InputStream;
9
import java.io.StringWriter;
10
import java.io.UnsupportedEncodingException;
11
import java.net.URL;
12
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14
import java.net.HttpURLConnection;
15
import java.util.Iterator;
16
import java.util.Map;
17
import java.util.Queue;
18
import java.util.concurrent.PriorityBlockingQueue;
19
import javax.xml.transform.OutputKeys;
20
import javax.xml.transform.Transformer;
21
import javax.xml.transform.TransformerConfigurationException;
22
import javax.xml.transform.TransformerFactory;
23
import javax.xml.transform.dom.DOMSource;
24
import javax.xml.transform.stream.StreamResult;
25
import javax.xml.xpath.*;
26

    
27
import com.google.common.collect.Maps;
28
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
29
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
30
import org.apache.commons.io.IOUtils;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.LogFactory;
34
import org.apache.http.client.methods.CloseableHttpResponse;
35
import org.apache.http.client.methods.HttpGet;
36
import org.apache.http.impl.client.HttpClients;
37
import org.w3c.dom.Node;
38
import org.w3c.dom.NodeList;
39
import org.xml.sax.InputSource;
40

    
41
/**
42
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak, Alessia Bardi, Miriam Baglioni
43
 * @date 2020-04-09
44
 */
45
public class RestIterator implements Iterator<String> {
46
    private final String AUTHBASIC = "basic";
47

    
48
    // TODO: clean up the comments of replaced source code
49
    private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
50
    private static final String XML_HEADER = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
51
    private static final String EMPTY_XML = XML_HEADER + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
52
    private JsonUtils jsonUtils;
53

    
54
    private String baseUrl;
55
    private String resumptionType;
56
    private String resumptionParam;
57
    private String resultFormatValue;
58
    private String queryParams = "";
59
    private int resultSizeValue;
60
    private int resumptionInt = 0;            // integer resumption token (first record to harvest)
61
    private int resultTotal = -1;
62
    private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
63
    private InputStream resultStream;
64
    private Transformer transformer;
65
    private XPath xpath;
66
    private String query;
67
    private XPathExpression xprResultTotalPath;
68
    private XPathExpression xprResumptionPath;
69
    private XPathExpression xprEntity;
70
    private String queryFormat;
71
    private String querySize;
72
    private String authMethod;
73
    private String authToken;
74
    private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
75
    private int discoverResultSize = 0;
76
    private int pagination = 1;
77
    /*
78
    While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in json.
79
    useful for cases when the target API expects a resultFormatValue != json, but the results are returned in json.
80
    An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format.
81
     */
82
    private String resultOutputFormat;
83
    /*
84
    Can be used to set additional request headers, like for content negotiation
85
     */
86
    private Map<String, String> requestHeaders;
87

    
88

    
89
    public RestIterator(
90
            final String baseUrl,
91
            final String resumptionType,
92
            final String resumptionParam,
93
            final String resumptionXpath,
94
            final String resultTotalXpath,
95
            final String resultFormatParam,
96
            final String resultFormatValue,
97
            final String resultSizeParam,
98
            final String resultSizeValueStr,
99
            final String queryParams,
100
            final String entityXpath,
101
            final String authMethod,
102
            final String authToken,
103
            final String resultOutputFormat,
104
            final Map<String, String> requestHeaders
105
    ) {
106
        this.jsonUtils = new JsonUtils();
107
        this.baseUrl = baseUrl;
108
        this.resumptionType = resumptionType;
109
        this.resumptionParam = resumptionParam;
110
        this.resultFormatValue = resultFormatValue;
111
        this.queryParams = queryParams;
112
        this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
113
        this.authMethod = authMethod;
114
        this.authToken = authToken;
115
        this.resultOutputFormat = resultOutputFormat;
116
        this.requestHeaders = requestHeaders != null ? requestHeaders : Maps.newHashMap();
117

    
118
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
119
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
120

    
121
        try {
122
            initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
123
        } catch (Exception e) {
124
            throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
125
        }
126
        initQueue();
127
    }
128

    
129

    
130
    private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
131
            throws TransformerConfigurationException, XPathExpressionException {
132
        transformer = TransformerFactory.newInstance().newTransformer();
133
        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
134
        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
135
        xpath = XPathFactory.newInstance().newXPath();
136
        xprResultTotalPath = xpath.compile(resultTotalXpath);
137
        xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
138
        xprEntity = xpath.compile(entityXpath);
139
    }
140

    
141
    private void initQueue() {
142
        if (queryParams.equals("") && querySize.equals("") && queryFormat.equals("")) {
143
            query = baseUrl;
144
        } else {
145
            query = baseUrl + "?" + queryParams + querySize + queryFormat;
146
        }
147

    
148
        log.info("REST calls starting with " + query);
149
    }
150

    
151
    private void disconnect() {
152
        // TODO close inputstream
153
    }
154

    
155
    /* (non-Javadoc)
156
     * @see java.util.Iterator#hasNext()
157
     */
158
    @Override
159
    public boolean hasNext() {
160
        if (recordQueue.isEmpty() && query.isEmpty()) {
161
            disconnect();
162
            return false;
163
        } else {
164
            return true;
165
        }
166
    }
167

    
168
    /* (non-Javadoc)
169
     * @see java.util.Iterator#next()
170
     */
171
    @Override
172
    public String next() {
173
        synchronized (recordQueue) {
174
            while (recordQueue.isEmpty() && !query.isEmpty()) {
175
                try {
176
                    log.debug("get Query: " + query);
177
                    query = downloadPage(query);
178
                    log.debug("next queryURL from downloadPage(): " + query);
179
                } catch (CollectorServiceException e) {
180
                    log.debug("CollectorPlugin.next()-Exception: " + e);
181
                    throw new RuntimeException(e);
182
                }
183
            }
184
            return recordQueue.poll();
185
        }
186
    }
187

    
188
    /*
189
     * download page and return nextQuery
190
     */
191
    private String downloadPage(String query) throws CollectorServiceException {
192
        String resultJson;
193
        String resultXml = XML_HEADER;
194
        String nextQuery = "";
195
        Node resultNode = null;
196
        NodeList nodeList = null;
197
        String qUrlArgument = "";
198
        int urlOldResumptionSize = 0;
199
        InputStream theHttpInputStream;
200

    
201
        // check if cursor=* is initial set otherwise add it to the queryParam URL
202
        if (resumptionType.equalsIgnoreCase("deep-cursor")) {
203
            log.debug("check resumptionType deep-cursor and check cursor=*?" + query);
204
            if (!query.contains("&cursor=")) {
205
                query += "&cursor=*";
206
            }
207
        }
208

    
209
        try {
210
            URL qUrl = new URL(query);
211
            log.debug("authMethod :" + authMethod);
212
            if (this.authMethod == "bearer") {
213
                log.trace("authMethod before inputStream: " + resultXml);
214
                requestHeaders.put("Authorization", "Bearer " + authToken);
215
                requestHeaders.put("Content-Type", "application/json");
216
            } else if (AUTHBASIC.equalsIgnoreCase(this.authMethod)) {
217
                log.trace("authMethod before inputStream: " + resultXml);
218
                requestHeaders.put("Authorization", "Basic " + authToken);
219
                requestHeaders.put("accept", "application/xml");
220
            }
221

    
222
            HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
223
            conn.setRequestMethod("GET");
224
            this.setRequestHeader(conn);
225
            resultStream = conn.getInputStream();
226

    
227
            if ("json".equals(resultOutputFormat)) {
228
                resultJson = IOUtils.toString(resultStream, "UTF-8");
229
                resultXml = jsonUtils.convertToXML(resultJson);
230
                resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
231
            }
232

    
233
            if (!isEmptyXml(resultXml)) {
234
                resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
235
                nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
236
                log.debug("nodeList.length: " + nodeList.getLength());
237
                for (int i = 0; i < nodeList.getLength(); i++) {
238
                    StringWriter sw = new StringWriter();
239
                    transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
240
                    String toEnqueue = sw.toString();
241
                    if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || isEmptyXml(toEnqueue)) {
242
                        log.warn("The following record resulted in empty item for the feeding queue: " + resultXml);
243
                    } else {
244
                        recordQueue.add(sw.toString());
245
                    }
246
                }
247
            } else {
248
                log.warn("resultXml is equal with emptyXml");
249
            }
250

    
251
            resumptionInt += resultSizeValue;
252

    
253
            switch (resumptionType.toLowerCase()) {
254
                case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
255
                    resumptionStr = xprResumptionPath.evaluate(resultNode);
256
                    break;
257

    
258
                case "count":   // begin at one step for all records, iterate over items
259
                    resumptionStr = Integer.toString(resumptionInt);
260
                    break;
261

    
262
                case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
263
                    if (resultSizeValue < 2) {
264
                        throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");
265
                    }
266
                    qUrlArgument = qUrl.getQuery();
267
                    String[] arrayQUrlArgument = qUrlArgument.split("&");
268
                    for (String arrayUrlArgStr : arrayQUrlArgument) {
269
                        if (arrayUrlArgStr.startsWith(resumptionParam)) {
270
                            String[] resumptionKeyValue = arrayUrlArgStr.split("=");
271
                            if (isInteger(resumptionKeyValue[1])) {
272
                                urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
273
                                log.debug("discover OldResumptionSize from Url (int): " + urlOldResumptionSize);
274
                            } else {
275
                                log.debug("discover OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
276
                            }
277
                        }
278
                    }
279

    
280
                    if (isEmptyXml(resultXml) || ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
281
                    ) {
282
                        // resumptionStr = "";
283
                        if (nodeList != null) {
284
                            discoverResultSize += nodeList.getLength();
285
                        }
286
                        resultTotal = discoverResultSize;
287
                    } else {
288
                        resumptionStr = Integer.toString(resumptionInt);
289
                        resultTotal = resumptionInt + 1;
290
                        if (nodeList != null) {
291
                            discoverResultSize += nodeList.getLength();
292
                        }
293
                    }
294
                    log.debug("discoverResultSize:  " + discoverResultSize);
295
                    break;
296

    
297
                case "pagination":
298
                case "page":         // pagination, iterate over page numbers
299
                    pagination += 1;
300
                    if (nodeList != null) {
301
                        discoverResultSize += nodeList.getLength();
302
                    } else {
303
                        resultTotal = discoverResultSize;
304
                        pagination = discoverResultSize;
305
                    }
306
                    resumptionInt = pagination;
307
                    resumptionStr = Integer.toString(resumptionInt);
308
                    break;
309

    
310
                case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
311
                    // isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
312

    
313
                    resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
314
                    queryParams = queryParams.replace("&cursor=*", "");
315

    
316
                    // terminating if length of nodeList is 0
317
                    if ((nodeList != null) && (nodeList.getLength() < discoverResultSize)) {
318
                        resumptionInt += (nodeList.getLength() + 1 - resultSizeValue);
319
                    } else {
320
                        resumptionInt += (nodeList.getLength() - resultSizeValue);    // subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
321
                    }
322

    
323
                    discoverResultSize = nodeList.getLength();
324

    
325
                    log.debug("downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
326

    
327
                    break;
328

    
329
                default:        // otherwise: abort
330
                    // resultTotal = resumptionInt;
331
                    break;
332
            }
333

    
334
        } catch (Exception e) {
335
            log.error(e);
336
            throw new IllegalStateException("collection failed: " + e.getMessage());
337
        }
338

    
339
        try {
340
            if (resultTotal == -1) {
341
                resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
342
                if (resumptionType.toLowerCase().equals("page") && !AUTHBASIC.equalsIgnoreCase(authMethod)) {
343
                    resultTotal += 1;
344
                }           // to correct the upper bound
345
                log.info("resultTotal was -1 is now: " + resultTotal);
346
            }
347
        } catch (Exception e) {
348
            log.error(e);
349
            throw new IllegalStateException("downloadPage() resultTotal couldn't parse: " + e.getMessage());
350
        }
351
        log.debug("resultTotal: " + resultTotal);
352
        log.debug("resInt: " + resumptionInt);
353
        if (resumptionInt <= resultTotal) {
354
            nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
355
        } else {
356
            nextQuery = "";
357
            // if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore
358
        }
359
        log.debug("downloadPage() nextQueryUrl: " + nextQuery);
360
        return nextQuery;
361

    
362

    
363
    }
364

    
365
    private boolean isEmptyXml(String s){
366
        return EMPTY_XML.equalsIgnoreCase(s);
367
    }
368

    
369

    
370
    private boolean isInteger(String s) {
371
        boolean isValidInteger = false;
372
        try {
373
            Integer.parseInt(s);
374

    
375
            // s is a valid integer
376

    
377
            isValidInteger = true;
378
        } catch (NumberFormatException ex) {
379
            // s is not an integer
380
        }
381

    
382
        return isValidInteger;
383
    }
384

    
385
    // Method to encode a string value using `UTF-8` encoding scheme
386
    private String encodeValue(String value) {
387
        try {
388
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
389
        } catch (UnsupportedEncodingException ex) {
390
            throw new RuntimeException(ex.getCause());
391
        }
392
    }
393

    
394
    private void setRequestHeader(HttpURLConnection conn) {
395
        if (requestHeaders != null) {
396
            for (String key : requestHeaders.keySet()) {
397
                conn.setRequestProperty(key, requestHeaders.get(key));
398
            }
399
            log.debug("Set Request Header with: " + requestHeaders);
400
        }
401

    
402
    }
403

    
404
    public String getResultFormatValue() {
405
        return resultFormatValue;
406
    }
407

    
408
    public String getResultOutputFormat() {
409
        return resultOutputFormat;
410
    }
411

    
412
}
(2-2/2)