Project

General

Profile

1
/**
2
 * log.debug(...) equal to  log.trace(...) in the application-logs
3
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7

    
8
import java.io.InputStream;
9
import java.io.StringWriter;
10
import java.io.UnsupportedEncodingException;
11
import java.net.URL;
12
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14
import java.net.HttpURLConnection;
15
import java.util.Iterator;
16
import java.util.Queue;
17
import java.util.concurrent.PriorityBlockingQueue;
18
import javax.xml.transform.OutputKeys;
19
import javax.xml.transform.Transformer;
20
import javax.xml.transform.TransformerConfigurationException;
21
import javax.xml.transform.TransformerFactory;
22
import javax.xml.transform.dom.DOMSource;
23
import javax.xml.transform.stream.StreamResult;
24
import javax.xml.xpath.*;
25

    
26
import eu.dnetlib.data.collector.plugins.oai.engine.XmlCleaner;
27
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
28
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
29
import org.apache.commons.io.IOUtils;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
33
import org.w3c.dom.Node;
34
import org.w3c.dom.NodeList;
35
import org.xml.sax.InputSource;
36

    
37
/**
38
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
39
 * @date 2020-04-09
40
 *
41
 */
42
public class RestIterator implements Iterator<String> {
43
	private final String BASIC = "basic";
44

    
45
	// TODO: clean up the comments of replaced source code
46
	private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
47

    
48
	private JsonUtils jsonUtils;
49

    
50
	private String baseUrl;
51
	private String resumptionType;
52
	private String resumptionParam;
53
	private String resultFormatValue;
54
	private String queryParams;
55
	private int resultSizeValue;
56
	private int resumptionInt = 0;            // integer resumption token (first record to harvest)
57
	private int resultTotal = -1;
58
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
59
	private InputStream resultStream;
60
	private Transformer transformer;
61
	private XPath xpath;
62
	private String query;
63
	private XPathExpression xprResultTotalPath;
64
	private XPathExpression xprResumptionPath;
65
	private XPathExpression xprEntity;
66
	private String queryFormat;
67
	private String querySize;
68
	private String authMethod;
69
	private String authToken;
70
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
71
	private int discoverResultSize = 0;
72
	private int pagination = 1;
73
	
74

    
75
	/**
76
	 * RestIterator class
77
	 * 
78
	 * compatible to version before 1.3.33
79
	 * 
80
	 * @param baseUrl
81
	 * @param resumptionType
82
	 * @param resumptionParam
83
	 * @param resumptionXpath
84
	 * @param resultTotalXpath
85
	 * @param resultFormatParam
86
	 * @param resultFormatValue
87
	 * @param resultSizeParam
88
	 * @param resultSizeValueStr
89
	 * @param queryParams
90
	 * @param entityXpath
91
	 */
92
	public RestIterator(
93
			final String baseUrl,
94
			final String resumptionType,
95
			final String resumptionParam,
96
			final String resumptionXpath,
97
			final String resultTotalXpath,
98
			final String resultFormatParam,
99
			final String resultFormatValue,
100
			final String resultSizeParam,
101
			final String resultSizeValueStr,
102
			final String queryParams,
103
			final String entityXpath
104
	) {
105
		this(baseUrl,resumptionType,resumptionParam,resumptionXpath,resultTotalXpath,resultFormatParam,resultFormatValue,resultSizeParam,resultSizeValueStr,queryParams,entityXpath,"", "");
106
	}
107

    
108
	public RestIterator(
109
			final String baseUrl,
110
			final String resumptionType,
111
			final String resumptionParam,
112
			final String resumptionXpath,
113
			final String resultTotalXpath,
114
			final String resultFormatParam,
115
			final String resultFormatValue,
116
			final String resultSizeParam,
117
			final String resultSizeValueStr,
118
			final String queryParams,
119
			final String entityXpath,
120
			final String authMethod,
121
			final String authToken,
122
			final String resultOffsetParam
123
	) {
124
		this(baseUrl,resumptionType,resumptionParam,resumptionXpath,resultTotalXpath,resultFormatParam,resultFormatValue,resultSizeParam,resultSizeValueStr,queryParams,entityXpath,"", "");
125
	}
126

    
127
	/** RestIterator class
128
	 *  compatible to version 1.3.33
129
	 * @param baseUrl
130
	 * @param resumptionType
131
	 * @param resumptionParam
132
	 * @param resumptionXpath
133
	 * @param resultTotalXpath
134
	 * @param resultFormatParam
135
	 * @param resultFormatValue
136
	 * @param resultSizeParam
137
	 * @param resultSizeValueStr
138
	 * @param queryParams
139
	 * @param entityXpath
140
	 * @param authMethod
141
	 * @param authToken
142
	 */
143
	public RestIterator(
144
			final String baseUrl,
145
			final String resumptionType,
146
			final String resumptionParam,
147
			final String resumptionXpath,
148
			final String resultTotalXpath,
149
			final String resultFormatParam,
150
			final String resultFormatValue,
151
			final String resultSizeParam,
152
			final String resultSizeValueStr,
153
			final String queryParams,
154
			final String entityXpath,
155
			final String authMethod,
156
			final String authToken
157
	) {
158
		this.jsonUtils = new JsonUtils();
159
		this.baseUrl = baseUrl;
160
		this.resumptionType = resumptionType;
161
		this.resumptionParam = resumptionParam;
162
		this.resultFormatValue = resultFormatValue;
163
		this.queryParams = queryParams;
164
		this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
165
		this.authMethod = authMethod;
166
		this.authToken = authToken;
167

    
168
		queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
169
		querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
170

    
171
		try {
172
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
173
		} catch (Exception e) {
174
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
175
		}
176
		initQueue();
177
	}
178
	
179
	
180
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
181
			throws TransformerConfigurationException, XPathExpressionException {
182
		transformer = TransformerFactory.newInstance().newTransformer();
183
		transformer.setOutputProperty(OutputKeys.INDENT, "yes");
184
		transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
185
		xpath = XPathFactory.newInstance().newXPath();
186
		xprResultTotalPath = xpath.compile(resultTotalXpath);
187
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
188
		xprEntity = xpath.compile(entityXpath);
189
	}
190

    
191
	private void initQueue() {
192
		query = baseUrl + "?" + queryParams + querySize + queryFormat;
193
	}
194

    
195
	private void disconnect() {
196
		// TODO close inputstream
197
	}
198

    
199
	/* (non-Javadoc)
200
	 * @see java.util.Iterator#hasNext()
201
	 */
202
	@Override
203
	public boolean hasNext() {
204
		if (recordQueue.isEmpty() && query.isEmpty()) {
205
			disconnect();
206
			return false;
207
		} else {
208
			return true;
209
		}
210
	}
211

    
212
	/* (non-Javadoc)
213
	 * @see java.util.Iterator#next()
214
	 */
215
	@Override
216
	public String next() {
217
		synchronized (recordQueue) {
218
			while (recordQueue.isEmpty() && !query.isEmpty()) {
219
				try {
220
					log.debug("get Query: " + query);
221
					query = downloadPage(query);
222
					log.debug("next queryURL from downloadPage(): " + query);
223
				} catch (CollectorServiceException e) {
224
					log.debug("CollectorPlugin.next()-Exception: " + e);
225
					throw new RuntimeException(e);
226
				}
227
			}
228
			return recordQueue.poll();
229
		}
230
	}
231

    
232
	/*
233
	 * download page and return nextQuery
234
	 */
235
	private String downloadPage(String query) throws CollectorServiceException {
236
		String resultJson;
237
		String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
238
		String nextQuery = "";
239
		String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
240
		Node resultNode = null;
241
		NodeList nodeList = null;
242
		String qUrlArgument = "";
243
		int urlOldResumptionSize = 0;
244
		InputStream theHttpInputStream;
245
		
246
		// check if cursor=* is initial set otherwise add it to the queryParam URL
247
		if( resumptionType.equalsIgnoreCase("deep-cursor") ) {
248
			log.debug("check resumptionType deep-cursor and check cursor=*?" + query);
249
			if(!query.contains("&cursor=")) {
250
				query += "&cursor=*";
251
			}
252
		}
253

    
254
		try {
255
			URL qUrl = new URL(query);
256
			log.debug("authMethod :" + authMethod);
257
			if (this.authMethod == "bearer") {
258
				log.trace("authMethod before inputStream: " + resultXml);
259
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
260
	        	conn.setRequestProperty("Authorization","Bearer "+authToken);
261
	        	conn.setRequestProperty("Content-Type","application/json");
262
	        	conn.setRequestMethod("GET");
263
	        	theHttpInputStream = conn.getInputStream();
264
			}else if (BASIC.equalsIgnoreCase(this.authMethod)) {
265
				log.trace("authMethod before inputStream: " + resultXml);
266
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
267
				conn.setRequestProperty("Authorization","Basic "+authToken);
268
				conn.setRequestProperty("accept","application/xml");
269
				conn.setRequestMethod("GET");
270
				theHttpInputStream = conn.getInputStream();
271
			} else {
272
				theHttpInputStream = qUrl.openStream();
273
			}
274
			
275
			resultStream = theHttpInputStream;
276
			if ("json".equals(resultFormatValue.toLowerCase())) {
277
				resultJson = IOUtils.toString(resultStream, "UTF-8");
278
				resultXml = jsonUtils.convertToXML(resultJson);
279
				resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
280
			}
281

    
282
			if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
283
				resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
284
				nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
285
				log.debug("nodeList.length: " + nodeList.getLength());
286
				for (int i = 0; i < nodeList.getLength(); i++) {
287
					StringWriter sw = new StringWriter();
288
					transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
289
					recordQueue.add(sw.toString());
290
				}
291
			} else { log.info("resultXml is equal with emptyXml"); }
292

    
293
			resumptionInt += resultSizeValue;
294

    
295
			switch (resumptionType.toLowerCase()) {
296
			case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
297
				resumptionStr = xprResumptionPath.evaluate(resultNode);
298
				break;
299

    
300
			case "count":   // begin at one step for all records, iterate over items
301
				resumptionStr = Integer.toString(resumptionInt);
302
				break;
303

    
304
			case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
305
				if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");}
306
				qUrlArgument = qUrl.getQuery();
307
				String[] arrayQUrlArgument = qUrlArgument.split("&");
308
				for (String arrayUrlArgStr : arrayQUrlArgument) {
309
					if (arrayUrlArgStr.startsWith(resumptionParam)) {
310
						String[] resumptionKeyValue = arrayUrlArgStr.split("=");
311
						if(isInteger(resumptionKeyValue[1])) {
312
							urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
313
							log.debug("discover OldResumptionSize from Url (int): " + urlOldResumptionSize);
314
						} else {
315
							log.debug("discover OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
316
						}
317
					}
318
				}
319

    
320
				if (((emptyXml.toLowerCase()).equals(resultXml.toLowerCase()))
321
						|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
322
				) {
323
					// resumptionStr = "";
324
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
325
					resultTotal = discoverResultSize;
326
				} else {
327
					resumptionStr = Integer.toString(resumptionInt);
328
					resultTotal = resumptionInt + 1;
329
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
330
				}
331
				log.debug("discoverResultSize:  " + discoverResultSize);
332
				break;
333

    
334
			case "pagination":
335
			case "page":         // pagination, iterate over page numbers
336
				pagination += 1;
337
				if (nodeList != null) {
338
					discoverResultSize += nodeList.getLength();
339
				} else {
340
					resultTotal = discoverResultSize;
341
					pagination = discoverResultSize;
342
				}
343
				resumptionInt = pagination;
344
				resumptionStr = Integer.toString(resumptionInt);
345
				break;
346

    
347
			case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
348
				// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
349

    
350
				resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
351
				queryParams = queryParams.replace("&cursor=*", "");
352
				
353
				// terminating if length of nodeList is 0
354
				if( (nodeList != null) && (nodeList.getLength() < discoverResultSize) ) {
355
					resumptionInt += ( nodeList.getLength() + 1 - resultSizeValue);
356
				} else {
357
					resumptionInt += (nodeList.getLength() - resultSizeValue);	// subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
358
				}
359
				
360
				discoverResultSize = nodeList.getLength();
361
				
362
				log.debug("downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
363

    
364
				break;
365
			
366
			default:        // otherwise: abort
367
				// resultTotal = resumptionInt;
368
				break;
369
			}
370

    
371
		} catch (Exception e) {
372
			log.error(e);
373
			throw new IllegalStateException("collection failed: " + e.getMessage());
374
		}			
375
			
376
		try {
377
			if (resultTotal == -1) {
378
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
379
				if (resumptionType.toLowerCase().equals("page") && !BASIC.equalsIgnoreCase(authMethod)) { resultTotal += 1; }           // to correct the upper bound
380
				log.info("resultTotal was -1 is now: " + resultTotal);
381
		}
382
		} catch(Exception e) {
383
			log.error(e);
384
			throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
385
		}
386
		log.debug("resultTotal: " + resultTotal);
387
		log.debug("resInt: " + resumptionInt);
388
		if (resumptionInt <= resultTotal) {
389
			nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
390
		} else {
391
			nextQuery = "";
392
			// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore 
393
		}
394
		log.debug("nextQueryUrl: " + nextQuery);
395
		return nextQuery;
396

    
397

    
398
	}
399

    
400

    
401
	
402
	private boolean isInteger(String s) {
403
		boolean isValidInteger = false;
404
		try {
405
			Integer.parseInt(s);
406

    
407
			// s is a valid integer
408

    
409
			isValidInteger = true;
410
		} catch (NumberFormatException ex) {
411
			// s is not an integer
412
		}
413

    
414
		return isValidInteger;
415
	}
416
	
417
	// Method to encode a string value using `UTF-8` encoding scheme
418
    private String encodeValue(String value) {
419
        try {
420
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
421
        } catch (UnsupportedEncodingException ex) {
422
            throw new RuntimeException(ex.getCause());
423
        }
424
    }
425

    
426
}
(2-2/2)