Project

General

Profile

1
/**
2
 * log.debug(...) equal to  log.trace(...) in the application-logs
3
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7

    
8
import java.io.InputStream;
9
import java.io.StringWriter;
10
import java.io.UnsupportedEncodingException;
11
import java.net.URL;
12
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14
import java.net.HttpURLConnection;
15
import java.util.Iterator;
16
import java.util.Queue;
17
import java.util.concurrent.PriorityBlockingQueue;
18
import javax.xml.transform.OutputKeys;
19
import javax.xml.transform.Transformer;
20
import javax.xml.transform.TransformerConfigurationException;
21
import javax.xml.transform.TransformerFactory;
22
import javax.xml.transform.dom.DOMSource;
23
import javax.xml.transform.stream.StreamResult;
24
import javax.xml.xpath.*;
25

    
26
import eu.dnetlib.data.collector.plugins.oai.engine.XmlCleaner;
27
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
28
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
29
import org.apache.commons.io.IOUtils;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
33
import org.w3c.dom.Node;
34
import org.w3c.dom.NodeList;
35
import org.xml.sax.InputSource;
36

    
37
/**
38
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
39
 * @date 2020-04-09
40
 *
41
 */
42
public class RestIterator implements Iterator<String> {
43

    
44
	// TODO: clean up the comments of replaced source code
45
	private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
46

    
47
	private JsonUtils jsonUtils;
48

    
49
	private String baseUrl;
50
	private String resumptionType;
51
	private String resumptionParam;
52
	private String resultFormatValue;
53
	private String queryParams;
54
	private int resultSizeValue;
55
	private int resumptionInt = 0;            // integer resumption token (first record to harvest)
56
	private int resultTotal = -1;
57
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
58
	private InputStream resultStream;
59
	private Transformer transformer;
60
	private XPath xpath;
61
	private String query;
62
	private XPathExpression xprResultTotalPath;
63
	private XPathExpression xprResumptionPath;
64
	private XPathExpression xprEntity;
65
	private String queryFormat;
66
	private String querySize;
67
	private String authMethod;
68
	private String authToken;
69
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
70
	private int discoverResultSize = 0;
71
	private int pagination = 1;
72
	
73

    
74
	/**
75
	 * RestIterator class
76
	 * 
77
	 * compatible to version before 1.3.33
78
	 * 
79
	 * @param baseUrl
80
	 * @param resumptionType
81
	 * @param resumptionParam
82
	 * @param resumptionXpath
83
	 * @param resultTotalXpath
84
	 * @param resultFormatParam
85
	 * @param resultFormatValue
86
	 * @param resultSizeParam
87
	 * @param resultSizeValueStr
88
	 * @param queryParams
89
	 * @param entityXpath
90
	 */
91
	public RestIterator(
92
			final String baseUrl,
93
			final String resumptionType,
94
			final String resumptionParam,
95
			final String resumptionXpath,
96
			final String resultTotalXpath,
97
			final String resultFormatParam,
98
			final String resultFormatValue,
99
			final String resultSizeParam,
100
			final String resultSizeValueStr,
101
			final String queryParams,
102
			final String entityXpath
103
	) {
104
		this(baseUrl,resumptionType,resumptionParam,resumptionXpath,resultTotalXpath,resultFormatParam,resultFormatValue,resultSizeParam,resultSizeValueStr,queryParams,entityXpath,"", "");
105
	}
106

    
107
	public RestIterator(
108
			final String baseUrl,
109
			final String resumptionType,
110
			final String resumptionParam,
111
			final String resumptionXpath,
112
			final String resultTotalXpath,
113
			final String resultFormatParam,
114
			final String resultFormatValue,
115
			final String resultSizeParam,
116
			final String resultSizeValueStr,
117
			final String queryParams,
118
			final String entityXpath,
119
			final String authMethod,
120
			final String authToken,
121
			final String resultOffsetParam
122
	) {
123
		this(baseUrl,resumptionType,resumptionParam,resumptionXpath,resultTotalXpath,resultFormatParam,resultFormatValue,resultSizeParam,resultSizeValueStr,queryParams,entityXpath,"", "");
124
	}
125

    
126
	/** RestIterator class
127
	 *  compatible to version 1.3.33
128
	 * @param baseUrl
129
	 * @param resumptionType
130
	 * @param resumptionParam
131
	 * @param resumptionXpath
132
	 * @param resultTotalXpath
133
	 * @param resultFormatParam
134
	 * @param resultFormatValue
135
	 * @param resultSizeParam
136
	 * @param resultSizeValueStr
137
	 * @param queryParams
138
	 * @param entityXpath
139
	 * @param authMethod
140
	 * @param authToken
141
	 */
142
	public RestIterator(
143
			final String baseUrl,
144
			final String resumptionType,
145
			final String resumptionParam,
146
			final String resumptionXpath,
147
			final String resultTotalXpath,
148
			final String resultFormatParam,
149
			final String resultFormatValue,
150
			final String resultSizeParam,
151
			final String resultSizeValueStr,
152
			final String queryParams,
153
			final String entityXpath,
154
			final String authMethod,
155
			final String authToken
156
	) {
157
		this.jsonUtils = new JsonUtils();
158
		this.baseUrl = baseUrl;
159
		this.resumptionType = resumptionType;
160
		this.resumptionParam = resumptionParam;
161
		this.resultFormatValue = resultFormatValue;
162
		this.queryParams = queryParams;
163
		this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
164
		this.authMethod = authMethod;
165
		this.authToken = authToken;
166

    
167
		queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
168
		querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
169

    
170
		try {
171
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
172
		} catch (Exception e) {
173
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
174
		}
175
		initQueue();
176
	}
177
	
178
	
179
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
180
			throws TransformerConfigurationException, XPathExpressionException {
181
		transformer = TransformerFactory.newInstance().newTransformer();
182
		transformer.setOutputProperty(OutputKeys.INDENT, "yes");
183
		transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
184
		xpath = XPathFactory.newInstance().newXPath();
185
		xprResultTotalPath = xpath.compile(resultTotalXpath);
186
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
187
		xprEntity = xpath.compile(entityXpath);
188
	}
189

    
190
	private void initQueue() {
191
		query = baseUrl + "?" + queryParams + querySize + queryFormat;
192
	}
193

    
194
	private void disconnect() {
195
		// TODO close inputstream
196
	}
197

    
198
	/* (non-Javadoc)
199
	 * @see java.util.Iterator#hasNext()
200
	 */
201
	@Override
202
	public boolean hasNext() {
203
		if (recordQueue.isEmpty() && query.isEmpty()) {
204
			disconnect();
205
			return false;
206
		} else {
207
			return true;
208
		}
209
	}
210

    
211
	/* (non-Javadoc)
212
	 * @see java.util.Iterator#next()
213
	 */
214
	@Override
215
	public String next() {
216
		synchronized (recordQueue) {
217
			while (recordQueue.isEmpty() && !query.isEmpty()) {
218
				try {
219
					log.debug("get Query: " + query);
220
					query = downloadPage(query);
221
					log.debug("next queryURL from downloadPage(): " + query);
222
				} catch (CollectorServiceException e) {
223
					log.debug("CollectorPlugin.next()-Exception: " + e);
224
					throw new RuntimeException(e);
225
				}
226
			}
227
			return recordQueue.poll();
228
		}
229
	}
230

    
231
	/*
232
	 * download page and return nextQuery
233
	 */
234
	private String downloadPage(String query) throws CollectorServiceException {
235
		String resultJson;
236
		String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
237
		String nextQuery = "";
238
		String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
239
		Node resultNode = null;
240
		NodeList nodeList = null;
241
		String qUrlArgument = "";
242
		int urlOldResumptionSize = 0;
243
		InputStream theHttpInputStream;
244
		
245
		// check if cursor=* is initial set otherwise add it to the queryParam URL
246
		if( resumptionType.equalsIgnoreCase("deep-cursor") ) {
247
			log.debug("check resumptionType deep-cursor and check cursor=*?" + query);
248
			if(!query.contains("&cursor=")) {
249
				query += "&cursor=*";
250
			}
251
		}
252

    
253
		try {
254
			URL qUrl = new URL(query);
255
			
256
			if (this.authMethod == "bearer") {
257
				log.trace("authMethod before inputStream: " + resultXml);
258

    
259
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
260
	        	conn.setRequestProperty("Authorization","Bearer "+authToken);
261
	        	conn.setRequestProperty("Content-Type","application/json");
262
	        	conn.setRequestMethod("GET");   
263
	        	theHttpInputStream = conn.getInputStream();
264
			} else {
265
				theHttpInputStream = qUrl.openStream();
266
			}
267
			
268
			resultStream = theHttpInputStream;
269
			if ("json".equals(resultFormatValue.toLowerCase())) {
270
				resultJson = IOUtils.toString(resultStream, "UTF-8");
271
				resultXml = jsonUtils.convertToXML(resultJson);
272
				resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
273
			}
274

    
275
			if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
276
				resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
277
				nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
278
				log.debug("nodeList.length: " + nodeList.getLength());
279
				for (int i = 0; i < nodeList.getLength(); i++) {
280
					StringWriter sw = new StringWriter();
281
					transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
282
					recordQueue.add(sw.toString());
283
				}
284
			} else { log.info("resultXml is equal with emptyXml"); }
285

    
286
			resumptionInt += resultSizeValue;
287

    
288
			switch (resumptionType.toLowerCase()) {
289
			case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
290
				resumptionStr = xprResumptionPath.evaluate(resultNode);
291
				break;
292

    
293
			case "count":   // begin at one step for all records, iterate over items
294
				resumptionStr = Integer.toString(resumptionInt);
295
				break;
296

    
297
			case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
298
				if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");}
299
				qUrlArgument = qUrl.getQuery();
300
				String[] arrayQUrlArgument = qUrlArgument.split("&");
301
				for (String arrayUrlArgStr : arrayQUrlArgument) {
302
					if (arrayUrlArgStr.startsWith(resumptionParam)) {
303
						String[] resumptionKeyValue = arrayUrlArgStr.split("=");
304
						if(isInteger(resumptionKeyValue[1])) {
305
							urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
306
							log.debug("discover OldResumptionSize from Url (int): " + urlOldResumptionSize);
307
						} else {
308
							log.debug("discover OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
309
						}
310
					}
311
				}
312

    
313
				if (((emptyXml.toLowerCase()).equals(resultXml.toLowerCase()))
314
						|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
315
				) {
316
					// resumptionStr = "";
317
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
318
					resultTotal = discoverResultSize;
319
				} else {
320
					resumptionStr = Integer.toString(resumptionInt);
321
					resultTotal = resumptionInt + 1;
322
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
323
				}
324
				log.debug("discoverResultSize:  " + discoverResultSize);
325
				break;
326

    
327
			case "pagination":
328
			case "page":         // pagination, iterate over page numbers
329
				pagination += 1;
330
				if (nodeList != null) {
331
					discoverResultSize += nodeList.getLength();
332
				} else {
333
					resultTotal = discoverResultSize;
334
					pagination = discoverResultSize;
335
				}
336
				resumptionInt = pagination;
337
				resumptionStr = Integer.toString(resumptionInt);
338
				break;
339

    
340
			case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
341
				// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
342

    
343
				resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
344
				queryParams = queryParams.replace("&cursor=*", "");
345
				
346
				// terminating if length of nodeList is 0
347
				if( (nodeList != null) && (nodeList.getLength() < discoverResultSize) ) {
348
					resumptionInt += ( nodeList.getLength() + 1 - resultSizeValue);
349
				} else {
350
					resumptionInt += (nodeList.getLength() - resultSizeValue);	// subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
351
				}
352
				
353
				discoverResultSize = nodeList.getLength();
354
				
355
				log.debug("downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
356

    
357
				break;
358
			
359
			default:        // otherwise: abort
360
				// resultTotal = resumptionInt;
361
				break;
362
			}
363

    
364
		} catch (Exception e) {
365
			log.error(e);
366
			throw new IllegalStateException("collection failed: " + e.getMessage());
367
		}			
368
			
369
		try {
370
			if (resultTotal == -1) {
371
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
372
				if (resumptionType.toLowerCase().equals("page")) { resultTotal += 1; }           // to correct the upper bound
373
				log.info("resultTotal was -1 is now: " + resultTotal);
374
		}
375
		} catch(Exception e) {
376
			log.error(e);
377
			throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
378
		}
379
		log.debug("resultTotal: " + resultTotal);
380
		log.debug("resInt: " + resumptionInt);
381
		if (resumptionInt <= resultTotal) {
382
			nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
383
		} else {
384
			nextQuery = "";
385
			// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore 
386
		}
387
		log.debug("nextQueryUrl: " + nextQuery);
388
		return nextQuery;
389

    
390

    
391
	}
392

    
393

    
394
	
395
	private boolean isInteger(String s) {
396
		boolean isValidInteger = false;
397
		try {
398
			Integer.parseInt(s);
399

    
400
			// s is a valid integer
401

    
402
			isValidInteger = true;
403
		} catch (NumberFormatException ex) {
404
			// s is not an integer
405
		}
406

    
407
		return isValidInteger;
408
	}
409
	
410
	// Method to encode a string value using `UTF-8` encoding scheme
411
    private String encodeValue(String value) {
412
        try {
413
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
414
        } catch (UnsupportedEncodingException ex) {
415
            throw new RuntimeException(ex.getCause());
416
        }
417
    }
418

    
419
}
(2-2/2)