Project

General

Profile

1
/**
2
 * log.debug(...) equal to  log.trace(...) in the application-logs
3
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7

    
8
import java.io.InputStream;
9
import java.io.StringWriter;
10
import java.io.UnsupportedEncodingException;
11
import java.net.URL;
12
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14
import java.net.HttpURLConnection;
15
import java.util.Iterator;
16
import java.util.Map;
17
import java.util.Queue;
18
import java.util.concurrent.PriorityBlockingQueue;
19
import javax.xml.transform.OutputKeys;
20
import javax.xml.transform.Transformer;
21
import javax.xml.transform.TransformerConfigurationException;
22
import javax.xml.transform.TransformerFactory;
23
import javax.xml.transform.dom.DOMSource;
24
import javax.xml.transform.stream.StreamResult;
25
import javax.xml.xpath.*;
26

    
27
import com.google.common.collect.Maps;
28
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
29
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
30
import org.apache.commons.io.IOUtils;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.LogFactory;
34
import org.apache.http.client.methods.CloseableHttpResponse;
35
import org.apache.http.client.methods.HttpGet;
36
import org.apache.http.impl.client.HttpClients;
37
import org.w3c.dom.Node;
38
import org.w3c.dom.NodeList;
39
import org.xml.sax.InputSource;
40

    
41
/**
42
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak, Alessia Bardi, Miriam Baglioni
43
 * @date 2020-04-09
44
 */
45
public class RestIterator implements Iterator<String> {
46
    private final String AUTHBASIC = "basic";
47

    
48
    // TODO: clean up the comments of replaced source code
49
    private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
50
    private static final String XML_HEADER = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
51
    private static final String EMPTY_XML = XML_HEADER + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
52
    private JsonUtils jsonUtils;
53

    
54
    private String baseUrl;
55
    private String resumptionType;
56
    private String resumptionParam;
57
    private String resultFormatValue;
58
    private String queryParams = "";
59
    private int resultSizeValue;
60
    private int resumptionInt = 0;            // integer resumption token (first record to harvest)
61
    private int resultTotal = -1;
62
    private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
63
    private InputStream resultStream;
64
    private Transformer transformer;
65
    private XPath xpath;
66
    private String query;
67
    private XPathExpression xprResultTotalPath;
68
    private XPathExpression xprResumptionPath;
69
    private XPathExpression xprEntity;
70
    private String queryFormat;
71
    private String querySize;
72
    private String authMethod;
73
    private String authToken;
74
    private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
75
    private int discoverResultSize = 0;
76
    private int pagination = 1;
77
    /*
78
    While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in json.
79
    useful for cases when the target API expects a resultFormatValue != json, but the results are returned in json.
80
    An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format.
81
     */
82
    private String resultOutputFormat;
83
    /*
84
    Can be used to set additional request headers, like for content negotiation
85
     */
86
    private Map<String, String> requestHeaders;
87

    
88

    
89
    public RestIterator(
90
            final String baseUrl,
91
            final String resumptionType,
92
            final String resumptionParam,
93
            final String resumptionXpath,
94
            final String resultTotalXpath,
95
            final String resultFormatParam,
96
            final String resultFormatValue,
97
            final String resultSizeParam,
98
            final String resultSizeValueStr,
99
            final String queryParams,
100
            final String entityXpath,
101
            final String authMethod,
102
            final String authToken,
103
            final String resultOutputFormat,
104
            final Map<String, String> requestHeaders
105
    ) {
106
        this.jsonUtils = new JsonUtils();
107
        this.baseUrl = baseUrl;
108
        this.resumptionType = resumptionType;
109
        this.resumptionParam = resumptionParam;
110
        this.resultFormatValue = resultFormatValue;
111
        this.queryParams = queryParams;
112
        this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
113
        this.authMethod = authMethod;
114
        this.authToken = authToken;
115
        this.resultOutputFormat = resultOutputFormat;
116
        this.requestHeaders = requestHeaders != null ? requestHeaders : Maps.newHashMap();
117

    
118
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
119
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
120

    
121
        try {
122
            initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
123
        } catch (Exception e) {
124
            throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
125
        }
126
        initQueue();
127
    }
128

    
129

    
130
    private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
131
            throws TransformerConfigurationException, XPathExpressionException {
132
        transformer = TransformerFactory.newInstance().newTransformer();
133
        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
134
        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
135
        xpath = XPathFactory.newInstance().newXPath();
136
        xprResultTotalPath = xpath.compile(resultTotalXpath);
137
        xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
138
        xprEntity = xpath.compile(entityXpath);
139
    }
140

    
141
    private void initQueue() {
142
        if (queryParams.equals("") && querySize.equals("") && queryFormat.equals("")) {
143
            query = baseUrl;
144
        } else {
145
            query = baseUrl + "?" + queryParams + querySize + queryFormat;
146
        }
147

    
148
        log.info("RestIterator.initQueue():: REST calls starting with " + query);
149
    }
150

    
151
    private void disconnect() {
152
        // TODO close inputstream
153
    }
154

    
155
    /* (non-Javadoc)
156
     * @see java.util.Iterator#hasNext()
157
     */
158
    @Override
159
    public boolean hasNext() {
160
        if (recordQueue.isEmpty() && query.isEmpty()) {
161
            disconnect();
162
            return false;
163
        } else {
164
            return true;
165
        }
166
    }
167

    
168
    /* (non-Javadoc)
169
     * @see java.util.Iterator#next()
170
     */
171
    @Override
172
    public String next() {
173
        synchronized (recordQueue) {
174
            while (recordQueue.isEmpty() && !query.isEmpty()) {
175
                try {
176
                    log.debug("RestIterator.next():: get Query: " + query);
177
                    query = downloadPage(query);
178
                    log.debug("RestIterator.next():: next queryURL from downloadPage(): " + query);
179
                } catch (CollectorServiceException e) {
180
                    log.debug("RestIterator.next():: CollectorPlugin.next()-Exception: " + e);
181
                    throw new RuntimeException(e);
182
                }
183
            }
184
            return recordQueue.poll();
185
        }
186
    }
187

    
188
    /*
189
     * download page and return nextQuery
190
     */
191
    private String downloadPage(String query) throws CollectorServiceException {
192
        String resultJson;
193
        String resultXml = XML_HEADER;
194
        String nextQuery = "";
195
        Node resultNode = null;
196
        NodeList nodeList = null;
197
        String qUrlArgument = "";
198
        int urlOldResumptionSize = 0;
199
        InputStream theHttpInputStream;
200

    
201
        // check if cursor=* is initial set otherwise add it to the queryParam URL
202
        if (resumptionType.equalsIgnoreCase("deep-cursor")) {
203
            log.debug("RestIterator.downloadPage():: check resumptionType deep-cursor and check cursor=*?" + query);
204
            if (!query.contains("&cursor=")) {
205
                query += "&cursor=*";
206
            }
207
        }
208

    
209
        try {
210
            URL qUrl = new URL(query);
211
            log.debug("authMethod :" + authMethod);
212
            if (this.authMethod == "bearer") {
213
                log.trace("RestIterator.downloadPage():: authMethod before inputStream: " + resultXml);
214
                requestHeaders.put("Authorization", "Bearer " + authToken);
215
                //requestHeaders.put("Content-Type", "application/json");
216
            } else if (AUTHBASIC.equalsIgnoreCase(this.authMethod)) {
217
                log.trace("RestIterator.downloadPage():: authMethod before inputStream: " + resultXml);
218
                requestHeaders.put("Authorization", "Basic " + authToken);
219
                //requestHeaders.put("accept", "application/xml");
220
            }
221

    
222
            HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
223
            conn.setRequestMethod("GET");
224
            this.setRequestHeader(conn);
225
            resultStream = conn.getInputStream();
226

    
227
            if ("json".equals(resultOutputFormat)) {
228
                resultJson = IOUtils.toString(resultStream, "UTF-8");
229
                resultXml = jsonUtils.convertToXML(resultJson);
230
                resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
231
            }
232

    
233
            if (!isEmptyXml(resultXml)) {
234
                resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
235
                nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
236
                log.debug("RestIterator.downloadPage():: nodeList.length=" + nodeList.getLength());
237
                for (int i = 0; i < nodeList.getLength(); i++) {
238
                    StringWriter sw = new StringWriter();
239
                    transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
240
                    String toEnqueue = sw.toString();
241
                    if (toEnqueue == null || StringUtils.isBlank(toEnqueue) || isEmptyXml(toEnqueue)) {
242
                        log.warn("RestIterator.downloadPage():: The following record resulted in empty item for the feeding queue: " + resultXml);
243
                    } else {
244
                        recordQueue.add(sw.toString());
245
                    }
246
                }
247
            } else {
248
                log.warn("resultXml is equal with emptyXml");
249
            }
250

    
251
            resumptionInt += resultSizeValue;
252

    
253
            switch (resumptionType.toLowerCase()) {
254
                case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
255
                    resumptionStr = xprResumptionPath.evaluate(resultNode);
256
                    break;
257

    
258
                case "count":   // begin at one step for all records, iterate over items
259
                    resumptionStr = Integer.toString(resumptionInt);
260
                    break;
261

    
262
                case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
263
                    if (resultSizeValue < 2) {
264
                        log.debug("RestIterator.downloadPage().discover:: ode: discover, Param 'resultSizeValue' must greater then 1");
265
                        throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' must greater then 1");
266
                    }
267
                    log.debug("RestIterator.downloadPage().discover:: resumptionInt="+Integer.toString(resumptionInt)+"; ");
268
                    qUrlArgument = qUrl.getQuery();
269

    
270
                    if( qUrlArgument != null ) {
271
                    String[] arrayQUrlArgument = qUrlArgument.split("&");
272

    
273
                    // check if URL arguments given
274
                    if( arrayQUrlArgument != null ) {
275
                        for (String arrayUrlArgStr : arrayQUrlArgument) {
276
                            log.debug("RestIterator.downloadPage/discover:: "+arrayUrlArgStr);
277
                            if (arrayUrlArgStr.startsWith(resumptionParam)) {
278
                                String[] resumptionKeyValue = arrayUrlArgStr.split("=");
279
                                if (isInteger(resumptionKeyValue[1])) {
280
                                    urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
281
                                    log.debug("RestIterator.downloadPage():discover:: OldResumptionSize from Url (int): " + urlOldResumptionSize);
282
                                } else {
283
                                    log.debug("RestIterator.downloadPage().discover:: OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
284
                                }
285
                            }
286
                        }
287
                    }
288
                    }
289
                    log.debug("RestIterator.downloadPage().discover:: nodeList.length=" + nodeList.getLength());
290

    
291
                    if (isEmptyXml(resultXml) || ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
292
                    ) {
293
                        // resumptionStr = "";
294
                        if (nodeList != null) {
295
                            discoverResultSize += nodeList.getLength();
296
                        }
297
                        resultTotal = discoverResultSize;
298
                    } else {
299
                        resumptionStr = Integer.toString(resumptionInt);
300
                        resultTotal = resumptionInt + 1;
301
                        if (nodeList != null) {
302
                            discoverResultSize += nodeList.getLength();
303
                        }
304
                    }
305
                    log.debug("RestIterator.downloadPage().discover:: discoverResultSize=" + discoverResultSize);
306
                    break;
307

    
308
                case "pagination":
309
                case "page":         // pagination, iterate over page numbers
310
                    pagination += 1;
311
                    if (nodeList != null) {
312
                        discoverResultSize += nodeList.getLength();
313
                    } else {
314
                        resultTotal = discoverResultSize;
315
                        pagination = discoverResultSize;
316
                    }
317
                    resumptionInt = pagination;
318
                    resumptionStr = Integer.toString(resumptionInt);
319
                    break;
320

    
321
                case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
322
                    // isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
323

    
324
                    resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
325
                    queryParams = queryParams.replace("&cursor=*", "");
326

    
327
                    // terminating if length of nodeList is 0
328
                    if ((nodeList != null) && (nodeList.getLength() < discoverResultSize)) {
329
                        resumptionInt += (nodeList.getLength() + 1 - resultSizeValue);
330
                    } else {
331
                        resumptionInt += (nodeList.getLength() - resultSizeValue);    // subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
332
                    }
333

    
334
                    discoverResultSize = nodeList.getLength();
335

    
336
                    log.debug("RestIterator.downloadPage().deep-cursor:: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
337

    
338
                    break;
339

    
340
                default:        // otherwise: abort
341
                    // resultTotal = resumptionInt;
342
                    break;
343
            }
344

    
345
        } catch (Exception e) {
346
            log.error(e);
347
            throw new IllegalStateException("collection failed: " + e.getMessage());
348
        }
349

    
350
        try {
351
            if (resultTotal == -1) {
352
                resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
353
                if (resumptionType.toLowerCase().equals("page") && !AUTHBASIC.equalsIgnoreCase(authMethod)) {
354
                    resultTotal += 1;
355
                }           // to correct the upper bound
356
                log.info("resultTotal was -1 is now: " + resultTotal);
357
            }
358
        } catch (Exception e) {
359
            log.error(e);
360
            throw new IllegalStateException("downloadPage() resultTotal couldn't parse: " + e.getMessage());
361
        }
362
        log.debug("resultTotal: " + resultTotal);
363
        log.debug("resInt: " + resumptionInt);
364
        if (resumptionInt <= resultTotal) {
365
            nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
366
        } else {
367
            nextQuery = "";
368
            // if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore
369
        }
370
        log.debug("downloadPage() nextQueryUrl: " + nextQuery);
371
        return nextQuery;
372

    
373

    
374
    }
375

    
376
    private boolean isEmptyXml(String s){
377
        return EMPTY_XML.equalsIgnoreCase(s);
378
    }
379

    
380

    
381
    private boolean isInteger(String s) {
382
        boolean isValidInteger = false;
383
        try {
384
            Integer.parseInt(s);
385

    
386
            // s is a valid integer
387

    
388
            isValidInteger = true;
389
        } catch (NumberFormatException ex) {
390
            // s is not an integer
391
        }
392

    
393
        return isValidInteger;
394
    }
395

    
396
    // Method to encode a string value using `UTF-8` encoding scheme
397
    private String encodeValue(String value) {
398
        try {
399
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
400
        } catch (UnsupportedEncodingException ex) {
401
            throw new RuntimeException(ex.getCause());
402
        }
403
    }
404

    
405
    /**
406
     * setRequestHeader
407
     * 
408
     * setRequestProperty: Sets the general request property. If a property with the key already exists, overwrite its value with the new value.
409
     * @param conn
410
     */
411
    private void setRequestHeader(HttpURLConnection conn) {
412
        if (requestHeaders != null) {
413
            for (String key : requestHeaders.keySet()) {
414
                conn.setRequestProperty(key, requestHeaders.get(key));
415
            }
416
            log.debug("Set Request Header with: " + requestHeaders);
417
        }
418

    
419
    }
420

    
421
    public String getResultFormatValue() {
422
        return resultFormatValue;
423
    }
424

    
425
    public String getResultOutputFormat() {
426
        return resultOutputFormat;
427
    }
428

    
429
}
(2-2/2)