Project

General

Profile

1
/**
2
 * log.debug(...) equal to  log.trace(...) in the application-logs
3
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7

    
8
import java.io.InputStream;
9
import java.io.StringWriter;
10
import java.io.UnsupportedEncodingException;
11
import java.net.URL;
12
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14
import java.net.HttpURLConnection;
15
import java.util.Iterator;
16
import java.util.Queue;
17
import java.util.concurrent.PriorityBlockingQueue;
18
import javax.xml.transform.OutputKeys;
19
import javax.xml.transform.Transformer;
20
import javax.xml.transform.TransformerConfigurationException;
21
import javax.xml.transform.TransformerFactory;
22
import javax.xml.transform.dom.DOMSource;
23
import javax.xml.transform.stream.StreamResult;
24
import javax.xml.xpath.*;
25

    
26
import eu.dnetlib.data.collector.plugins.oai.engine.XmlCleaner;
27
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
28
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
29
import org.apache.commons.io.IOUtils;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
33
import org.w3c.dom.Node;
34
import org.w3c.dom.NodeList;
35
import org.xml.sax.InputSource;
36

    
37
/**
38
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
39
 * @date 2020-04-09
40
 *
41
 */
42
public class RestIterator implements Iterator<String> {
43
	private final String BASIC = "basic";
44

    
45
	// TODO: clean up the comments of replaced source code
46
	private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
47

    
48
	private JsonUtils jsonUtils;
49

    
50
	private String baseUrl;
51
	private String resumptionType;
52
	private String resumptionParam;
53
	private String resultFormatValue;
54
	private String queryParams;
55
	private int resultSizeValue;
56
	private int resumptionInt = 0;            // integer resumption token (first record to harvest)
57
	private int resultTotal = -1;
58
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
59
	private InputStream resultStream;
60
	private Transformer transformer;
61
	private XPath xpath;
62
	private String query;
63
	private XPathExpression xprResultTotalPath;
64
	private XPathExpression xprResumptionPath;
65
	private XPathExpression xprEntity;
66
	private String queryFormat;
67
	private String querySize;
68
	private String authMethod;
69
	private String authToken;
70
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
71
	private int discoverResultSize = 0;
72
	private int pagination = 1;
73
	/*
74
	While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in json.
75
	useful for cases when the target API expects a resultFormatValue != json, but the results are returned in json.
76
	An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format.
77
	 */
78
	private  String resultOutputFormat;
79
	
80

    
81
	public RestIterator(
82
			final String baseUrl,
83
			final String resumptionType,
84
			final String resumptionParam,
85
			final String resumptionXpath,
86
			final String resultTotalXpath,
87
			final String resultFormatParam,
88
			final String resultFormatValue,
89
			final String resultSizeParam,
90
			final String resultSizeValueStr,
91
			final String queryParams,
92
			final String entityXpath,
93
			final String authMethod,
94
			final String authToken,
95
			final String resultOutputFormat
96
	) {
97
		this.jsonUtils = new JsonUtils();
98
		this.baseUrl = baseUrl;
99
		this.resumptionType = resumptionType;
100
		this.resumptionParam = resumptionParam;
101
		this.resultFormatValue = resultFormatValue;
102
		this.queryParams = queryParams;
103
		this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
104
		this.authMethod = authMethod;
105
		this.authToken = authToken;
106
		this.resultOutputFormat = resultOutputFormat;
107

    
108
		queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
109
		querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
110

    
111
		try {
112
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
113
		} catch (Exception e) {
114
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
115
		}
116
		initQueue();
117
	}
118
	
119
	
120
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
121
			throws TransformerConfigurationException, XPathExpressionException {
122
		transformer = TransformerFactory.newInstance().newTransformer();
123
		transformer.setOutputProperty(OutputKeys.INDENT, "yes");
124
		transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
125
		xpath = XPathFactory.newInstance().newXPath();
126
		xprResultTotalPath = xpath.compile(resultTotalXpath);
127
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
128
		xprEntity = xpath.compile(entityXpath);
129
	}
130

    
131
	private void initQueue() {
132
		query = baseUrl + "?" + queryParams + querySize + queryFormat;
133
	}
134

    
135
	private void disconnect() {
136
		// TODO close inputstream
137
	}
138

    
139
	/* (non-Javadoc)
140
	 * @see java.util.Iterator#hasNext()
141
	 */
142
	@Override
143
	public boolean hasNext() {
144
		if (recordQueue.isEmpty() && query.isEmpty()) {
145
			disconnect();
146
			return false;
147
		} else {
148
			return true;
149
		}
150
	}
151

    
152
	/* (non-Javadoc)
153
	 * @see java.util.Iterator#next()
154
	 */
155
	@Override
156
	public String next() {
157
		synchronized (recordQueue) {
158
			while (recordQueue.isEmpty() && !query.isEmpty()) {
159
				try {
160
					log.debug("get Query: " + query);
161
					query = downloadPage(query);
162
					log.debug("next queryURL from downloadPage(): " + query);
163
				} catch (CollectorServiceException e) {
164
					log.debug("CollectorPlugin.next()-Exception: " + e);
165
					throw new RuntimeException(e);
166
				}
167
			}
168
			return recordQueue.poll();
169
		}
170
	}
171

    
172
	/*
173
	 * download page and return nextQuery
174
	 */
175
	private String downloadPage(String query) throws CollectorServiceException {
176
		String resultJson;
177
		String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
178
		String nextQuery = "";
179
		String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
180
		Node resultNode = null;
181
		NodeList nodeList = null;
182
		String qUrlArgument = "";
183
		int urlOldResumptionSize = 0;
184
		InputStream theHttpInputStream;
185
		
186
		// check if cursor=* is initial set otherwise add it to the queryParam URL
187
		if( resumptionType.equalsIgnoreCase("deep-cursor") ) {
188
			log.debug("check resumptionType deep-cursor and check cursor=*?" + query);
189
			if(!query.contains("&cursor=")) {
190
				query += "&cursor=*";
191
			}
192
		}
193

    
194
		try {
195
			URL qUrl = new URL(query);
196
			log.debug("authMethod :" + authMethod);
197
			if (this.authMethod == "bearer") {
198
				log.trace("authMethod before inputStream: " + resultXml);
199
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
200
	        	conn.setRequestProperty("Authorization","Bearer "+authToken);
201
	        	conn.setRequestProperty("Content-Type","application/json");
202
	        	conn.setRequestMethod("GET");
203
	        	theHttpInputStream = conn.getInputStream();
204
			}else if (BASIC.equalsIgnoreCase(this.authMethod)) {
205
				log.trace("authMethod before inputStream: " + resultXml);
206
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
207
				conn.setRequestProperty("Authorization","Basic "+authToken);
208
				conn.setRequestProperty("accept","application/xml");
209
				conn.setRequestMethod("GET");
210
				theHttpInputStream = conn.getInputStream();
211
			} else {
212
				theHttpInputStream = qUrl.openStream();
213
			}
214
			
215
			resultStream = theHttpInputStream;
216
			if ("json".equals(resultOutputFormat)) {
217
				resultJson = IOUtils.toString(resultStream, "UTF-8");
218
				resultXml = jsonUtils.convertToXML(resultJson);
219
				resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
220
			}
221

    
222
			if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
223
				resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
224
				nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
225
				log.debug("nodeList.length: " + nodeList.getLength());
226
				for (int i = 0; i < nodeList.getLength(); i++) {
227
					StringWriter sw = new StringWriter();
228
					transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
229
					recordQueue.add(sw.toString());
230
				}
231
			} else { log.info("resultXml is equal with emptyXml"); }
232

    
233
			resumptionInt += resultSizeValue;
234

    
235
			switch (resumptionType.toLowerCase()) {
236
			case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
237
				resumptionStr = xprResumptionPath.evaluate(resultNode);
238
				break;
239

    
240
			case "count":   // begin at one step for all records, iterate over items
241
				resumptionStr = Integer.toString(resumptionInt);
242
				break;
243

    
244
			case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
245
				if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");}
246
				qUrlArgument = qUrl.getQuery();
247
				String[] arrayQUrlArgument = qUrlArgument.split("&");
248
				for (String arrayUrlArgStr : arrayQUrlArgument) {
249
					if (arrayUrlArgStr.startsWith(resumptionParam)) {
250
						String[] resumptionKeyValue = arrayUrlArgStr.split("=");
251
						if(isInteger(resumptionKeyValue[1])) {
252
							urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
253
							log.debug("discover OldResumptionSize from Url (int): " + urlOldResumptionSize);
254
						} else {
255
							log.debug("discover OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
256
						}
257
					}
258
				}
259

    
260
				if (((emptyXml.toLowerCase()).equals(resultXml.toLowerCase()))
261
						|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
262
				) {
263
					// resumptionStr = "";
264
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
265
					resultTotal = discoverResultSize;
266
				} else {
267
					resumptionStr = Integer.toString(resumptionInt);
268
					resultTotal = resumptionInt + 1;
269
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
270
				}
271
				log.debug("discoverResultSize:  " + discoverResultSize);
272
				break;
273

    
274
			case "pagination":
275
			case "page":         // pagination, iterate over page numbers
276
				pagination += 1;
277
				if (nodeList != null) {
278
					discoverResultSize += nodeList.getLength();
279
				} else {
280
					resultTotal = discoverResultSize;
281
					pagination = discoverResultSize;
282
				}
283
				resumptionInt = pagination;
284
				resumptionStr = Integer.toString(resumptionInt);
285
				break;
286

    
287
			case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
288
				// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
289

    
290
				resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
291
				queryParams = queryParams.replace("&cursor=*", "");
292
				
293
				// terminating if length of nodeList is 0
294
				if( (nodeList != null) && (nodeList.getLength() < discoverResultSize) ) {
295
					resumptionInt += ( nodeList.getLength() + 1 - resultSizeValue);
296
				} else {
297
					resumptionInt += (nodeList.getLength() - resultSizeValue);	// subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
298
				}
299
				
300
				discoverResultSize = nodeList.getLength();
301
				
302
				log.debug("downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
303

    
304
				break;
305
			
306
			default:        // otherwise: abort
307
				// resultTotal = resumptionInt;
308
				break;
309
			}
310

    
311
		} catch (Exception e) {
312
			log.error(e);
313
			throw new IllegalStateException("collection failed: " + e.getMessage());
314
		}			
315
			
316
		try {
317
			if (resultTotal == -1) {
318
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
319
				if (resumptionType.toLowerCase().equals("page") && !BASIC.equalsIgnoreCase(authMethod)) { resultTotal += 1; }           // to correct the upper bound
320
				log.info("resultTotal was -1 is now: " + resultTotal);
321
		}
322
		} catch(Exception e) {
323
			log.error(e);
324
			throw new IllegalStateException("downloadPage resultTotal couldn't parse: " + e.getMessage());
325
		}
326
		log.debug("resultTotal: " + resultTotal);
327
		log.debug("resInt: " + resumptionInt);
328
		if (resumptionInt <= resultTotal) {
329
			nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
330
		} else {
331
			nextQuery = "";
332
			// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore 
333
		}
334
		log.debug("nextQueryUrl: " + nextQuery);
335
		return nextQuery;
336

    
337

    
338
	}
339

    
340

    
341
	
342
	private boolean isInteger(String s) {
343
		boolean isValidInteger = false;
344
		try {
345
			Integer.parseInt(s);
346

    
347
			// s is a valid integer
348

    
349
			isValidInteger = true;
350
		} catch (NumberFormatException ex) {
351
			// s is not an integer
352
		}
353

    
354
		return isValidInteger;
355
	}
356
	
357
	// Method to encode a string value using `UTF-8` encoding scheme
358
    private String encodeValue(String value) {
359
        try {
360
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
361
        } catch (UnsupportedEncodingException ex) {
362
            throw new RuntimeException(ex.getCause());
363
        }
364
    }
365

    
366
	public String getResultFormatValue() {
367
		return resultFormatValue;
368
	}
369

    
370
	public String getResultOutputFormat() {
371
		return resultOutputFormat;
372
	}
373

    
374
}
(2-2/2)