Project

General

Profile

1 50066 jochen.sch
/**
2 52983 andreas.cz
 * log.debug(...) equal to  log.trace(...) in the application-logs
3 53854 alessia.ba
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5 50066 jochen.sch
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7
8
import java.io.InputStream;
9 50582 jochen.sch
import java.io.StringWriter;
10 58456 andreas.cz
import java.io.UnsupportedEncodingException;
11 50066 jochen.sch
import java.net.URL;
12 58456 andreas.cz
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14 58315 andreas.cz
import java.net.HttpURLConnection;
15 50066 jochen.sch
import java.util.Iterator;
16
import java.util.Queue;
17 50582 jochen.sch
import java.util.concurrent.PriorityBlockingQueue;
18 50066 jochen.sch
import javax.xml.transform.OutputKeys;
19
import javax.xml.transform.Transformer;
20
import javax.xml.transform.TransformerConfigurationException;
21
import javax.xml.transform.TransformerFactory;
22 50582 jochen.sch
import javax.xml.transform.dom.DOMSource;
23
import javax.xml.transform.stream.StreamResult;
24 53854 alessia.ba
import javax.xml.xpath.*;
25 50066 jochen.sch
26 53854 alessia.ba
import eu.dnetlib.data.collector.plugins.oai.engine.XmlCleaner;
27 58993 alessia.ba
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
28 53854 alessia.ba
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
29 50066 jochen.sch
import org.apache.commons.io.IOUtils;
30 50662 alessia.ba
import org.apache.commons.lang3.StringUtils;
31 50582 jochen.sch
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
33 50066 jochen.sch
import org.w3c.dom.Node;
34 50582 jochen.sch
import org.w3c.dom.NodeList;
35 50066 jochen.sch
import org.xml.sax.InputSource;
36
37
/**
38 52970 andreas.cz
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
39 58456 andreas.cz
 * @date 2020-04-09
40 50066 jochen.sch
 *
41
 */
42
public class RestIterator implements Iterator<String> {
43
44 53854 alessia.ba
	// TODO: clean up the comments of replaced source code
45 53853 alessia.ba
	private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
46 50582 jochen.sch
47 58993 alessia.ba
	private JsonUtils jsonUtils;
48
49 50066 jochen.sch
	private String baseUrl;
50
	private String resumptionType;
51
	private String resumptionParam;
52
	private String resultFormatValue;
53
	private String queryParams;
54 52970 andreas.cz
	private int resultSizeValue;
55 53854 alessia.ba
	private int resumptionInt = 0;            // integer resumption token (first record to harvest)
56 50066 jochen.sch
	private int resultTotal = -1;
57
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
58
	private InputStream resultStream;
59
	private Transformer transformer;
60
	private XPath xpath;
61 50582 jochen.sch
	private String query;
62 50066 jochen.sch
	private XPathExpression xprResultTotalPath;
63
	private XPathExpression xprResumptionPath;
64 50582 jochen.sch
	private XPathExpression xprEntity;
65 50066 jochen.sch
	private String queryFormat;
66
	private String querySize;
67 58315 andreas.cz
	private String authMethod;
68
	private String authToken;
69 50582 jochen.sch
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
70 53854 alessia.ba
	private int discoverResultSize = 0;
71
	private int pagination = 1;
72 58532 andreas.cz
73 52983 andreas.cz
74 58427 andreas.cz
	/**
75
	 * RestIterator class
76
	 *
77
	 * compatible to version before 1.3.33
78
	 *
79
	 * @param baseUrl
80
	 * @param resumptionType
81
	 * @param resumptionParam
82
	 * @param resumptionXpath
83
	 * @param resultTotalXpath
84
	 * @param resultFormatParam
85
	 * @param resultFormatValue
86
	 * @param resultSizeParam
87
	 * @param resultSizeValueStr
88
	 * @param queryParams
89
	 * @param entityXpath
90
	 */
91 50066 jochen.sch
	public RestIterator(
92
			final String baseUrl,
93
			final String resumptionType,
94
			final String resumptionParam,
95
			final String resumptionXpath,
96
			final String resultTotalXpath,
97
			final String resultFormatParam,
98
			final String resultFormatValue,
99
			final String resultSizeParam,
100 53854 alessia.ba
			final String resultSizeValueStr,
101 50582 jochen.sch
			final String queryParams,
102
			final String entityXpath
103 53854 alessia.ba
	) {
104 58456 andreas.cz
		this(baseUrl,resumptionType,resumptionParam,resumptionXpath,resultTotalXpath,resultFormatParam,resultFormatValue,resultSizeParam,resultSizeValueStr,queryParams,entityXpath,"", "");
105 58315 andreas.cz
	}
106
107 58457 andreas.cz
	public RestIterator(
108
			final String baseUrl,
109
			final String resumptionType,
110
			final String resumptionParam,
111
			final String resumptionXpath,
112
			final String resultTotalXpath,
113
			final String resultFormatParam,
114
			final String resultFormatValue,
115
			final String resultSizeParam,
116
			final String resultSizeValueStr,
117
			final String queryParams,
118
			final String entityXpath,
119
			final String authMethod,
120
			final String authToken,
121
			final String resultOffsetParam
122
	) {
123
		this(baseUrl,resumptionType,resumptionParam,resumptionXpath,resultTotalXpath,resultFormatParam,resultFormatValue,resultSizeParam,resultSizeValueStr,queryParams,entityXpath,"", "");
124
	}
125
126 58427 andreas.cz
	/** RestIterator class
127
	 *  compatible to version 1.3.33
128
	 * @param baseUrl
129
	 * @param resumptionType
130
	 * @param resumptionParam
131
	 * @param resumptionXpath
132
	 * @param resultTotalXpath
133
	 * @param resultFormatParam
134
	 * @param resultFormatValue
135
	 * @param resultSizeParam
136
	 * @param resultSizeValueStr
137
	 * @param queryParams
138
	 * @param entityXpath
139
	 * @param authMethod
140
	 * @param authToken
141
	 */
142 58315 andreas.cz
	public RestIterator(
143
			final String baseUrl,
144
			final String resumptionType,
145
			final String resumptionParam,
146
			final String resumptionXpath,
147
			final String resultTotalXpath,
148
			final String resultFormatParam,
149
			final String resultFormatValue,
150
			final String resultSizeParam,
151
			final String resultSizeValueStr,
152
			final String queryParams,
153
			final String entityXpath,
154
			final String authMethod,
155
			final String authToken
156
	) {
157 58993 alessia.ba
		this.jsonUtils = new JsonUtils();
158 50066 jochen.sch
		this.baseUrl = baseUrl;
159
		this.resumptionType = resumptionType;
160
		this.resumptionParam = resumptionParam;
161
		this.resultFormatValue = resultFormatValue;
162
		this.queryParams = queryParams;
163 53854 alessia.ba
		this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
164 58315 andreas.cz
		this.authMethod = authMethod;
165
		this.authToken = authToken;
166 50066 jochen.sch
167 53854 alessia.ba
		queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
168
		querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
169
170 50066 jochen.sch
		try {
171 50582 jochen.sch
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
172 53854 alessia.ba
		} catch (Exception e) {
173 50584 claudio.at
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
174 50066 jochen.sch
		}
175 53854 alessia.ba
		initQueue();
176 50066 jochen.sch
	}
177 58315 andreas.cz
178
179 53854 alessia.ba
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
180
			throws TransformerConfigurationException, XPathExpressionException {
181 50066 jochen.sch
		transformer = TransformerFactory.newInstance().newTransformer();
182 53854 alessia.ba
		transformer.setOutputProperty(OutputKeys.INDENT, "yes");
183
		transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
184
		xpath = XPathFactory.newInstance().newXPath();
185 50066 jochen.sch
		xprResultTotalPath = xpath.compile(resultTotalXpath);
186 53854 alessia.ba
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
187
		xprEntity = xpath.compile(entityXpath);
188 50066 jochen.sch
	}
189 53854 alessia.ba
190 50066 jochen.sch
	private void initQueue() {
191 52997 andreas.cz
		query = baseUrl + "?" + queryParams + querySize + queryFormat;
192 50066 jochen.sch
	}
193 53854 alessia.ba
194 50066 jochen.sch
	private void disconnect() {
195
		// TODO close inputstream
196
	}
197 53854 alessia.ba
198 50066 jochen.sch
	/* (non-Javadoc)
199
	 * @see java.util.Iterator#hasNext()
200
	 */
201
	@Override
202
	public boolean hasNext() {
203 50582 jochen.sch
		if (recordQueue.isEmpty() && query.isEmpty()) {
204 50066 jochen.sch
			disconnect();
205
			return false;
206
		} else {
207
			return true;
208
		}
209
	}
210
211
	/* (non-Javadoc)
212
	 * @see java.util.Iterator#next()
213
	 */
214
	@Override
215
	public String next() {
216 50582 jochen.sch
		synchronized (recordQueue) {
217 53854 alessia.ba
			while (recordQueue.isEmpty() && !query.isEmpty()) {
218 50582 jochen.sch
				try {
219 58603 alessia.ba
					log.debug("get Query: " + query);
220 50582 jochen.sch
					query = downloadPage(query);
221 53854 alessia.ba
					log.debug("next queryURL from downloadPage(): " + query);
222
				} catch (CollectorServiceException e) {
223
					log.debug("CollectorPlugin.next()-Exception: " + e);
224 50582 jochen.sch
					throw new RuntimeException(e);
225
				}
226
			}
227
			return recordQueue.poll();
228
		}
229
	}
230 53854 alessia.ba
231 50582 jochen.sch
	/*
232
	 * download page and return nextQuery
233
	 */
234 53854 alessia.ba
	private String downloadPage(String query) throws CollectorServiceException {
235 50066 jochen.sch
		String resultJson;
236 53116 andreas.cz
		String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
237 50582 jochen.sch
		String nextQuery = "";
238 58993 alessia.ba
		String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
239 53854 alessia.ba
		Node resultNode = null;
240
		NodeList nodeList = null;
241
		String qUrlArgument = "";
242
		int urlOldResumptionSize = 0;
243 58315 andreas.cz
		InputStream theHttpInputStream;
244 58520 andreas.cz
245
		// check if cursor=* is initial set otherwise add it to the queryParam URL
246
		if( resumptionType.equalsIgnoreCase("deep-cursor") ) {
247
			log.debug("check resumptionType deep-cursor and check cursor=*?" + query);
248
			if(!query.contains("&cursor=")) {
249
				query += "&cursor=*";
250
			}
251
		}
252 53854 alessia.ba
253 50066 jochen.sch
		try {
254 53854 alessia.ba
			URL qUrl = new URL(query);
255 59844 miriam.bag
			log.debug("authMethod :" + authMethod);
256 58315 andreas.cz
			if (this.authMethod == "bearer") {
257 58993 alessia.ba
				log.trace("authMethod before inputStream: " + resultXml);
258 59842 miriam.bag
				log.debug("authMethod :" + authMethod);
259 58315 andreas.cz
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
260
	        	conn.setRequestProperty("Authorization","Bearer "+authToken);
261
	        	conn.setRequestProperty("Content-Type","application/json");
262 59836 miriam.bag
	        	conn.setRequestMethod("GET");
263 58315 andreas.cz
	        	theHttpInputStream = conn.getInputStream();
264 59841 miriam.bag
			}else if (this.authMethod == "basic") {
265 59836 miriam.bag
				log.trace("authMethod before inputStream: " + resultXml);
266 59842 miriam.bag
				log.debug("authMethod :" + authMethod);
267 59836 miriam.bag
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
268
				conn.setRequestProperty("Authorization","Basic "+authToken);
269
				conn.setRequestProperty("accept","application/xml");
270
				conn.setRequestMethod("GET");
271 59841 miriam.bag
				log.debug("header :" + conn.getHeaderFields());
272
				log.debug("url : " + qUrl.getPath());
273 59836 miriam.bag
				theHttpInputStream = conn.getInputStream();
274 59841 miriam.bag
			} else {
275 58315 andreas.cz
				theHttpInputStream = qUrl.openStream();
276
			}
277
278
			resultStream = theHttpInputStream;
279 53854 alessia.ba
			if ("json".equals(resultFormatValue.toLowerCase())) {
280
				resultJson = IOUtils.toString(resultStream, "UTF-8");
281 58993 alessia.ba
				resultXml = jsonUtils.convertToXML(resultJson);
282 53854 alessia.ba
				resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
283 50066 jochen.sch
			}
284 53854 alessia.ba
285
			if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
286
				resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
287
				nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
288
				log.debug("nodeList.length: " + nodeList.getLength());
289
				for (int i = 0; i < nodeList.getLength(); i++) {
290
					StringWriter sw = new StringWriter();
291
					transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
292
					recordQueue.add(sw.toString());
293
				}
294
			} else { log.info("resultXml is equal with emptyXml"); }
295
296 50066 jochen.sch
			resumptionInt += resultSizeValue;
297 52983 andreas.cz
298 53854 alessia.ba
			switch (resumptionType.toLowerCase()) {
299
			case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
300
				resumptionStr = xprResumptionPath.evaluate(resultNode);
301
				break;
302 52971 andreas.cz
303 53854 alessia.ba
			case "count":   // begin at one step for all records, iterate over items
304
				resumptionStr = Integer.toString(resumptionInt);
305
				break;
306
307
			case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
308
				if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");}
309
				qUrlArgument = qUrl.getQuery();
310
				String[] arrayQUrlArgument = qUrlArgument.split("&");
311
				for (String arrayUrlArgStr : arrayQUrlArgument) {
312
					if (arrayUrlArgStr.startsWith(resumptionParam)) {
313
						String[] resumptionKeyValue = arrayUrlArgStr.split("=");
314 58427 andreas.cz
						if(isInteger(resumptionKeyValue[1])) {
315
							urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
316
							log.debug("discover OldResumptionSize from Url (int): " + urlOldResumptionSize);
317
						} else {
318
							log.debug("discover OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
319
						}
320 53854 alessia.ba
					}
321
				}
322
323
				if (((emptyXml.toLowerCase()).equals(resultXml.toLowerCase()))
324
						|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
325
				) {
326
					// resumptionStr = "";
327
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
328
					resultTotal = discoverResultSize;
329
				} else {
330
					resumptionStr = Integer.toString(resumptionInt);
331
					resultTotal = resumptionInt + 1;
332
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
333
				}
334 58603 alessia.ba
				log.debug("discoverResultSize:  " + discoverResultSize);
335 53854 alessia.ba
				break;
336
337
			case "pagination":
338 58427 andreas.cz
			case "page":         // pagination, iterate over page numbers
339 53854 alessia.ba
				pagination += 1;
340
				if (nodeList != null) {
341
					discoverResultSize += nodeList.getLength();
342
				} else {
343
					resultTotal = discoverResultSize;
344
					pagination = discoverResultSize;
345
				}
346
				resumptionInt = pagination;
347
				resumptionStr = Integer.toString(resumptionInt);
348
				break;
349
350 58427 andreas.cz
			case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
351 58449 andreas.cz
				// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
352 58427 andreas.cz
353 58456 andreas.cz
				resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
354 58452 andreas.cz
				queryParams = queryParams.replace("&cursor=*", "");
355 58427 andreas.cz
356 58520 andreas.cz
				// terminating if length of nodeList is 0
357 58532 andreas.cz
				if( (nodeList != null) && (nodeList.getLength() < discoverResultSize) ) {
358
					resumptionInt += ( nodeList.getLength() + 1 - resultSizeValue);
359 58520 andreas.cz
				} else {
360
					resumptionInt += (nodeList.getLength() - resultSizeValue);	// subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
361
				}
362 58532 andreas.cz
363
				discoverResultSize = nodeList.getLength();
364
365 58488 andreas.cz
				log.debug("downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
366 58427 andreas.cz
367
				break;
368
369 53854 alessia.ba
			default:        // otherwise: abort
370
				// resultTotal = resumptionInt;
371
				break;
372
			}
373
374
		} catch (Exception e) {
375 50584 claudio.at
			log.error(e);
376
			throw new IllegalStateException("collection failed: " + e.getMessage());
377 58427 andreas.cz
		}
378
379 58428 andreas.cz
		try {
380
			if (resultTotal == -1) {
381
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
382
				if (resumptionType.toLowerCase().equals("page")) { resultTotal += 1; }           // to correct the upper bound
383
				log.info("resultTotal was -1 is now: " + resultTotal);
384 50066 jochen.sch
		}
385 58428 andreas.cz
		} catch(Exception e) {
386
			log.error(e);
387
			throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
388
		}
389 58603 alessia.ba
		log.debug("resultTotal: " + resultTotal);
390
		log.debug("resInt: " + resumptionInt);
391 58427 andreas.cz
		if (resumptionInt <= resultTotal) {
392
			nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
393 58528 andreas.cz
		} else {
394 58427 andreas.cz
			nextQuery = "";
395 58532 andreas.cz
			// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore
396 58528 andreas.cz
		}
397 58427 andreas.cz
		log.debug("nextQueryUrl: " + nextQuery);
398
		return nextQuery;
399
400
401 50066 jochen.sch
	}
402
403 52970 andreas.cz
404 58427 andreas.cz
405
	private boolean isInteger(String s) {
406
		boolean isValidInteger = false;
407
		try {
408
			Integer.parseInt(s);
409 53854 alessia.ba
410 58427 andreas.cz
			// s is a valid integer
411
412
			isValidInteger = true;
413
		} catch (NumberFormatException ex) {
414
			// s is not an integer
415
		}
416
417
		return isValidInteger;
418
	}
419 58456 andreas.cz
420
	// Method to encode a string value using `UTF-8` encoding scheme
421
    private String encodeValue(String value) {
422
        try {
423
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
424
        } catch (UnsupportedEncodingException ex) {
425
            throw new RuntimeException(ex.getCause());
426
        }
427
    }
428 58427 andreas.cz
429 50066 jochen.sch
}