Project

General

Profile

1
/**
2
 * log.debug(...) equal to  log.trace(...) in the application-logs
3
 * <p>
4
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue
5
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7

    
8
import java.io.InputStream;
9
import java.io.StringWriter;
10
import java.io.UnsupportedEncodingException;
11
import java.net.URL;
12
import java.net.URLEncoder;
13
import java.nio.charset.StandardCharsets;
14
import java.net.HttpURLConnection;
15
import java.util.Iterator;
16
import java.util.Queue;
17
import java.util.concurrent.PriorityBlockingQueue;
18
import javax.xml.transform.OutputKeys;
19
import javax.xml.transform.Transformer;
20
import javax.xml.transform.TransformerConfigurationException;
21
import javax.xml.transform.TransformerFactory;
22
import javax.xml.transform.dom.DOMSource;
23
import javax.xml.transform.stream.StreamResult;
24
import javax.xml.xpath.*;
25

    
26
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
27
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
28
import org.apache.commons.io.IOUtils;
29
import org.apache.commons.lang3.StringUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.w3c.dom.Node;
33
import org.w3c.dom.NodeList;
34
import org.xml.sax.InputSource;
35

    
36
/**
37
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak, Alessia Bardi, Miriam Baglioni
38
 * @date 2020-04-09
39
 *
40
 */
41
public class RestIterator implements Iterator<String> {
42
	private final String BASIC = "basic";
43

    
44
	// TODO: clean up the comments of replaced source code
45
	private static final Log log = LogFactory.getLog(RestIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
46

    
47
	private JsonUtils jsonUtils;
48

    
49
	private String baseUrl;
50
	private String resumptionType;
51
	private String resumptionParam;
52
	private String resultFormatValue;
53
	private String queryParams;
54
	private int resultSizeValue;
55
	private int resumptionInt = 0;            // integer resumption token (first record to harvest)
56
	private int resultTotal = -1;
57
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
58
	private InputStream resultStream;
59
	private Transformer transformer;
60
	private XPath xpath;
61
	private String query;
62
	private XPathExpression xprResultTotalPath;
63
	private XPathExpression xprResumptionPath;
64
	private XPathExpression xprEntity;
65
	private String queryFormat;
66
	private String querySize;
67
	private String authMethod;
68
	private String authToken;
69
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
70
	private int discoverResultSize = 0;
71
	private int pagination = 1;
72
	/*
73
	While resultFormatValue is added to the request parameter, this is used to say that the results are retrieved in json.
74
	useful for cases when the target API expects a resultFormatValue != json, but the results are returned in json.
75
	An example is the EU Open Data Portal API: resultFormatValue=standard, results are in json format.
76
	 */
77
	private  String resultOutputFormat;
78
	
79

    
80
	public RestIterator(
81
			final String baseUrl,
82
			final String resumptionType,
83
			final String resumptionParam,
84
			final String resumptionXpath,
85
			final String resultTotalXpath,
86
			final String resultFormatParam,
87
			final String resultFormatValue,
88
			final String resultSizeParam,
89
			final String resultSizeValueStr,
90
			final String queryParams,
91
			final String entityXpath,
92
			final String authMethod,
93
			final String authToken,
94
			final String resultOutputFormat
95
	) {
96
		this.jsonUtils = new JsonUtils();
97
		this.baseUrl = baseUrl;
98
		this.resumptionType = resumptionType;
99
		this.resumptionParam = resumptionParam;
100
		this.resultFormatValue = resultFormatValue;
101
		this.queryParams = queryParams;
102
		this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
103
		this.authMethod = authMethod;
104
		this.authToken = authToken;
105
		this.resultOutputFormat = resultOutputFormat;
106

    
107
		queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
108
		querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
109

    
110
		try {
111
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
112
		} catch (Exception e) {
113
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
114
		}
115
		initQueue();
116
	}
117
	
118
	
119
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath)
120
			throws TransformerConfigurationException, XPathExpressionException {
121
		transformer = TransformerFactory.newInstance().newTransformer();
122
		transformer.setOutputProperty(OutputKeys.INDENT, "yes");
123
		transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "3");
124
		xpath = XPathFactory.newInstance().newXPath();
125
		xprResultTotalPath = xpath.compile(resultTotalXpath);
126
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
127
		xprEntity = xpath.compile(entityXpath);
128
	}
129

    
130
	private void initQueue() {
131
		query = baseUrl + "?" + queryParams + querySize + queryFormat;
132
		log.info("REST calls starting with "+query);
133
	}
134

    
135
	private void disconnect() {
136
		// TODO close inputstream
137
	}
138

    
139
	/* (non-Javadoc)
140
	 * @see java.util.Iterator#hasNext()
141
	 */
142
	@Override
143
	public boolean hasNext() {
144
		if (recordQueue.isEmpty() && query.isEmpty()) {
145
			disconnect();
146
			return false;
147
		} else {
148
			return true;
149
		}
150
	}
151

    
152
	/* (non-Javadoc)
153
	 * @see java.util.Iterator#next()
154
	 */
155
	@Override
156
	public String next() {
157
		synchronized (recordQueue) {
158
			while (recordQueue.isEmpty() && !query.isEmpty()) {
159
				try {
160
					log.debug("get Query: " + query);
161
					query = downloadPage(query);
162
					log.debug("next queryURL from downloadPage(): " + query);
163
				} catch (CollectorServiceException e) {
164
					log.debug("CollectorPlugin.next()-Exception: " + e);
165
					throw new RuntimeException(e);
166
				}
167
			}
168
			return recordQueue.poll();
169
		}
170
	}
171

    
172
	/*
173
	 * download page and return nextQuery
174
	 */
175
	private String downloadPage(String query) throws CollectorServiceException {
176
		String resultJson;
177
		String resultXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>";
178
		String nextQuery = "";
179
		String emptyXml = resultXml + "<" + JsonUtils.wrapName + "></" + JsonUtils.wrapName + ">";
180
		Node resultNode = null;
181
		NodeList nodeList = null;
182
		String qUrlArgument = "";
183
		int urlOldResumptionSize = 0;
184
		InputStream theHttpInputStream;
185
		
186
		// check if cursor=* is initial set otherwise add it to the queryParam URL
187
		if( resumptionType.equalsIgnoreCase("deep-cursor") ) {
188
			log.debug("check resumptionType deep-cursor and check cursor=*?" + query);
189
			if(!query.contains("&cursor=")) {
190
				query += "&cursor=*";
191
			}
192
		}
193

    
194
		try {
195
			URL qUrl = new URL(query);
196
			log.debug("authMethod :" + authMethod);
197
			if (this.authMethod == "bearer") {
198
				log.trace("authMethod before inputStream: " + resultXml);
199
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
200
	        	conn.setRequestProperty("Authorization","Bearer "+authToken);
201
	        	conn.setRequestProperty("Content-Type","application/json");
202
	        	conn.setRequestMethod("GET");
203
	        	theHttpInputStream = conn.getInputStream();
204
			}else if (BASIC.equalsIgnoreCase(this.authMethod)) {
205
				log.trace("authMethod before inputStream: " + resultXml);
206
				HttpURLConnection conn = (HttpURLConnection) qUrl.openConnection();
207
				conn.setRequestProperty("Authorization","Basic "+authToken);
208
				conn.setRequestProperty("accept","application/xml");
209
				conn.setRequestMethod("GET");
210
				theHttpInputStream = conn.getInputStream();
211
			} else {
212
				theHttpInputStream = qUrl.openStream();
213
			}
214
			
215
			resultStream = theHttpInputStream;
216
			if ("json".equals(resultOutputFormat)) {
217
				resultJson = IOUtils.toString(resultStream, "UTF-8");
218
				resultXml = jsonUtils.convertToXML(resultJson);
219
				resultStream = IOUtils.toInputStream(resultXml, "UTF-8");
220
			}
221

    
222
			if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
223
				resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
224
				nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
225
				log.debug("nodeList.length: " + nodeList.getLength());
226
				for (int i = 0; i < nodeList.getLength(); i++) {
227
					StringWriter sw = new StringWriter();
228
					transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
229
					String toEnqueue = sw.toString();
230
					if(toEnqueue == null || StringUtils.isBlank(toEnqueue) || emptyXml.equalsIgnoreCase(toEnqueue)){
231
						log.warn("The following record resulted in empty item for the feeding queue: "+resultXml);
232
					}
233
					else{ recordQueue.add(sw.toString());}
234
				}
235
			} else { log.warn("resultXml is equal with emptyXml"); }
236

    
237
			resumptionInt += resultSizeValue;
238

    
239
			switch (resumptionType.toLowerCase()) {
240
			case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
241
				resumptionStr = xprResumptionPath.evaluate(resultNode);
242
				break;
243

    
244
			case "count":   // begin at one step for all records, iterate over items
245
				resumptionStr = Integer.toString(resumptionInt);
246
				break;
247

    
248
			case "discover":   // size of result items unknown, iterate over items  (for openDOAR - 201808)
249
				if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");}
250
				qUrlArgument = qUrl.getQuery();
251
				String[] arrayQUrlArgument = qUrlArgument.split("&");
252
				for (String arrayUrlArgStr : arrayQUrlArgument) {
253
					if (arrayUrlArgStr.startsWith(resumptionParam)) {
254
						String[] resumptionKeyValue = arrayUrlArgStr.split("=");
255
						if(isInteger(resumptionKeyValue[1])) {
256
							urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
257
							log.debug("discover OldResumptionSize from Url (int): " + urlOldResumptionSize);
258
						} else {
259
							log.debug("discover OldResumptionSize from Url (str): " + resumptionKeyValue[1]);
260
						}
261
					}
262
				}
263

    
264
				if (((emptyXml.toLowerCase()).equals(resultXml.toLowerCase()))
265
						|| ((nodeList != null) && (nodeList.getLength() < resultSizeValue))
266
				) {
267
					// resumptionStr = "";
268
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
269
					resultTotal = discoverResultSize;
270
				} else {
271
					resumptionStr = Integer.toString(resumptionInt);
272
					resultTotal = resumptionInt + 1;
273
					if (nodeList != null) { discoverResultSize += nodeList.getLength(); }
274
				}
275
				log.debug("discoverResultSize:  " + discoverResultSize);
276
				break;
277

    
278
			case "pagination":
279
			case "page":         // pagination, iterate over page numbers
280
				pagination += 1;
281
				if (nodeList != null) {
282
					discoverResultSize += nodeList.getLength();
283
				} else {
284
					resultTotal = discoverResultSize;
285
					pagination = discoverResultSize;
286
				}
287
				resumptionInt = pagination;
288
				resumptionStr = Integer.toString(resumptionInt);
289
				break;
290

    
291
			case "deep-cursor":   // size of result items unknown, iterate over items  (for supporting deep cursor in solr)
292
				// isn't relevant -- if (resultSizeValue < 2) {throw new CollectorServiceException("Mode: deep-cursor, Param 'resultSizeValue' is less than 2");}
293

    
294
				resumptionStr = encodeValue(xprResumptionPath.evaluate(resultNode));
295
				queryParams = queryParams.replace("&cursor=*", "");
296
				
297
				// terminating if length of nodeList is 0
298
				if( (nodeList != null) && (nodeList.getLength() < discoverResultSize) ) {
299
					resumptionInt += ( nodeList.getLength() + 1 - resultSizeValue);
300
				} else {
301
					resumptionInt += (nodeList.getLength() - resultSizeValue);	// subtract the resultSizeValue because the iteration is over real length and the resultSizeValue is added before the switch()
302
				}
303
				
304
				discoverResultSize = nodeList.getLength();
305
				
306
				log.debug("downloadPage().deep-cursor: resumptionStr=" + resumptionStr + " ; queryParams=" + queryParams + " resumptionLengthIncreased: " + resumptionInt);
307

    
308
				break;
309
			
310
			default:        // otherwise: abort
311
				// resultTotal = resumptionInt;
312
				break;
313
			}
314

    
315
		} catch (Exception e) {
316
			log.error(e);
317
			throw new IllegalStateException("collection failed: " + e.getMessage());
318
		}			
319
			
320
		try {
321
			if (resultTotal == -1) {
322
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
323
				if (resumptionType.toLowerCase().equals("page") && !BASIC.equalsIgnoreCase(authMethod)) { resultTotal += 1; }           // to correct the upper bound
324
				log.info("resultTotal was -1 is now: " + resultTotal);
325
		}
326
		} catch(Exception e) {
327
			log.error(e);
328
			throw new IllegalStateException("downloadPage() resultTotal couldn't parse: " + e.getMessage());
329
		}
330
		log.debug("resultTotal: " + resultTotal);
331
		log.debug("resInt: " + resumptionInt);
332
		if (resumptionInt <= resultTotal) {
333
			nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
334
		} else {
335
			nextQuery = "";
336
			// if (resumptionType.toLowerCase().equals("deep-cursor")) { resumptionInt -= 1; }    	// correct the resumptionInt and prevent a NullPointer Exception at mdStore 
337
		}
338
		log.debug("downloadPage() nextQueryUrl: " + nextQuery);
339
		return nextQuery;
340

    
341

    
342
	}
343

    
344

    
345
	
346
	private boolean isInteger(String s) {
347
		boolean isValidInteger = false;
348
		try {
349
			Integer.parseInt(s);
350

    
351
			// s is a valid integer
352

    
353
			isValidInteger = true;
354
		} catch (NumberFormatException ex) {
355
			// s is not an integer
356
		}
357

    
358
		return isValidInteger;
359
	}
360
	
361
	// Method to encode a string value using `UTF-8` encoding scheme
362
    private String encodeValue(String value) {
363
        try {
364
            return URLEncoder.encode(value, StandardCharsets.UTF_8.toString());
365
        } catch (UnsupportedEncodingException ex) {
366
            throw new RuntimeException(ex.getCause());
367
        }
368
    }
369

    
370
	public String getResultFormatValue() {
371
		return resultFormatValue;
372
	}
373

    
374
	public String getResultOutputFormat() {
375
		return resultOutputFormat;
376
	}
377

    
378
}
(2-2/2)