Project

General

Profile

1
/**
2
 * 
3
 * 
4
 * log.debug(...) equal to  log.trace(...) in the application-logs
5
 * 
6
 * known bug: at resumptionType 'discover' if the (resultTotal % resultSizeValue) == 0 the collecting fails -> change the resultSizeValue  
7
 */
8
package eu.dnetlib.data.collector.plugins.rest;
9

    
10
import java.io.InputStream;
11
import java.io.StringWriter;
12
import java.net.URL;
13
import java.util.Iterator;
14
import java.util.Queue;
15
import java.util.concurrent.PriorityBlockingQueue;
16

    
17
import javax.xml.transform.OutputKeys;
18
import javax.xml.transform.Transformer;
19
import javax.xml.transform.TransformerConfigurationException;
20
import javax.xml.transform.TransformerFactory;
21
import javax.xml.transform.dom.DOMSource;
22
import javax.xml.transform.stream.StreamResult;
23
import javax.xml.xpath.XPath;
24
import javax.xml.xpath.XPathConstants;
25
import javax.xml.xpath.XPathExpression;
26
import javax.xml.xpath.XPathExpressionException;
27
import javax.xml.xpath.XPathFactory;
28

    
29
import org.apache.commons.io.IOUtils;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
33
import org.w3c.dom.Node;
34
import org.w3c.dom.NodeList;
35
import org.xml.sax.InputSource;
36

    
37
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
38
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
39

    
40
/**
41
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
42
 * @date 2018-09-03
43
 *
44
 */
45
public class RestIterator implements Iterator<String> {
46

    
47
    // TODO: clean up the comments of replaced source code
48
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
49

    
50
	private static final String wrapName = "recordWrap";
51
	private String baseUrl;
52
	private String resumptionType;
53
	private String resumptionParam;
54
	private String resultFormatValue;
55
	private String queryParams;
56
	private int resultSizeValue;
57
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
58
	private int resultTotal = -1;
59
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
60
	private InputStream resultStream;
61
	private Transformer transformer;
62
	private XPath xpath;
63
	private String query;
64
	private XPathExpression xprResultTotalPath;
65
	private XPathExpression xprResumptionPath;
66
	private XPathExpression xprEntity;
67
	private String queryFormat;
68
	private String querySize;
69
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
70
        private int discoverResultSize = 0;
71

    
72
	public RestIterator(
73
			final String baseUrl,
74
			final String resumptionType,
75
			final String resumptionParam,
76
			final String resumptionXpath,
77
			final String resultTotalXpath,
78
			final String resultFormatParam,
79
			final String resultFormatValue,
80
			final String resultSizeParam,
81
                        final String resultSizeValueStr,
82
			final String queryParams,
83
			final String entityXpath
84
			) {
85
		this.baseUrl = baseUrl;
86
		this.resumptionType = resumptionType;
87
		this.resumptionParam = resumptionParam;
88
		this.resultFormatValue = resultFormatValue;
89
		this.queryParams = queryParams;
90
                this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
91
                        
92
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
93
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
94

    
95
		try {
96
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
97
		} catch(Exception e) {
98
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
99
		}
100
        initQueue();
101
	}
102
	
103
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
104
		transformer = TransformerFactory.newInstance().newTransformer();
105
                transformer.setOutputProperty(OutputKeys.INDENT,"yes"); 
106
                transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
107
		xpath              = XPathFactory.newInstance().newXPath();
108
		xprResultTotalPath = xpath.compile(resultTotalXpath);
109
		xprResumptionPath  = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
110
		xprEntity          = xpath.compile(entityXpath);
111
	}
112
	
113
	private void initQueue() {
114
//		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
115
                //  change at 2018-08-30 and remove resumptionParam&-Type from "query" URL because first request starts mostly at the beginning.
116
		query = baseUrl + "?" + queryParams + querySize + queryFormat;
117

    
118
	}
119
	
120
	private void disconnect() {
121
		// TODO close inputstream
122
	}
123
	
124
	/* (non-Javadoc)
125
	 * @see java.util.Iterator#hasNext()
126
	 */
127
	@Override
128
	public boolean hasNext() {
129
		if (recordQueue.isEmpty() && query.isEmpty()) {
130
			disconnect();
131
			return false;
132
		} else {
133
			return true;
134
		}
135
	}
136

    
137
	/* (non-Javadoc)
138
	 * @see java.util.Iterator#next()
139
	 */
140
	@Override
141
	public String next() {
142
		synchronized (recordQueue) {
143
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
144
				try {
145
                                        log.info("get Query: " + query);
146
					query = downloadPage(query);
147
                                        log.debug("next query from downloadPage method: " + query);
148
				} catch(CollectorServiceException e) {
149
                                        log.debug("CollectorPlugin.next()-Exception: " + e);
150
					throw new RuntimeException(e);
151
				}
152
			}
153
			return recordQueue.poll();
154
		}
155
	}
156
	
157
	
158
	/*
159
	 * download page and return nextQuery
160
	 */
161
	private String downloadPage(String query) throws CollectorServiceException{
162
		String resultJson;
163
		String resultXml = "";
164
		String nextQuery = "";
165
                String emptyXml = "<"+wrapName+"></"+wrapName+">";
166
                Node resultNode = null;
167
                NodeList nodeList = null;
168
                String qUrlArgument = "";
169
                int urlOldResumptionSize = 0;
170
                
171
		try {
172
                        URL qUrl = new URL(query);
173
                        
174
                        resultStream = qUrl.openStream();
175
			if("json".equals(resultFormatValue.toLowerCase())){				
176
				resultJson = IOUtils.toString(resultStream,"UTF-8");
177

    
178
				//TODO move regex definitions as constant fields
179
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
180
				resultJson = syntaxConvertJsonKeyNamens(resultJson);
181
//                                while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
182
//					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
183
//				}
184
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
185
                                resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
186
				log.trace(resultXml);
187
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
188
			}
189
			
190
                        if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
191
                            resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
192
                            nodeList   = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
193
                            log.debug("nodeList.length: " + nodeList.getLength());
194
                            for (int i = 0; i < nodeList.getLength(); i++) {
195
                                StringWriter sw = new StringWriter();
196
                                transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
197
                                recordQueue.add(sw.toString());
198
                            }
199
                        } else { log.info("resultXml is equal with emptyXml"); }	
200
                        
201
			resumptionInt += resultSizeValue;
202
                        
203
/*	replaced by switch statement as follow		
204
                        if("scan".equals(resumptionType.toLowerCase())) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
205
			if("count".equals(resumptionType.toLowerCase())){ resumptionStr = Integer.toString(resumptionInt); }
206
*/
207
                        switch(resumptionType.toLowerCase()) {
208
                            case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI, iterate over items
209
                                            resumptionStr = xprResumptionPath.evaluate(resultNode);
210
                                            break;
211
                                            
212
                            case "count":   // begin at one step for all records, iterate over items
213
                                            resumptionStr = Integer.toString(resumptionInt);
214
                                            break;
215
                                            
216
                            case "discover":   // size of result items unknown, iterate over items                                     
217
                                            if (resultSizeValue < 2 ) {throw new CollectorServiceException("Mode: discover, Param 'resultSizeValue' is less than 2");}
218
                                            qUrlArgument = qUrl.getQuery();
219
                                            String[] arrayQUrlArgument = qUrlArgument.split("&");
220
                                            for(String arrayUrlArgStr : arrayQUrlArgument ) {
221
                                                if(arrayUrlArgStr.startsWith(resumptionParam)) {
222
                                                    String[] resumptionKeyValue = arrayUrlArgStr.split("=");
223
                                                    urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
224
                                                    log.debug("discover OldResumptionSize from Url: " + urlOldResumptionSize);
225
                                                }
226
                                            }
227

    
228
                                            // 
229
                                            if(   ( (emptyXml.toLowerCase()).equals(resultXml.toLowerCase()) ) 
230
                                               || ( (nodeList != null) && (nodeList.getLength() < resultSizeValue) ) 
231
                                              ) {
232
                                                // resumptionStr = "";
233
                                                if(nodeList != null) { discoverResultSize += nodeList.getLength(); }
234
                                                resultTotal   = discoverResultSize;
235
                                            } else {
236
                                                resumptionStr = Integer.toString(resumptionInt);
237
                                                resultTotal   = resumptionInt+1;
238
                                                if(nodeList != null) { discoverResultSize += nodeList.getLength(); }
239
                                            }
240
                                            log.info("discoverResultSize:  " + discoverResultSize);
241
                                            break;
242
                                            
243
                            case "page":    // pagination, iterate over pages
244
                                            // TODO pagination collecting
245
                                            break;
246
                                            
247
                            default:        // otherwise: abort
248
                                            // resultTotal = resumptionInt;
249
                                            break;
250
                        }
251

    
252
			if (resultTotal == -1) {
253
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
254
				log.info("resultTotal was -1 is now: " + resultTotal);
255
			}
256
			log.info("resultTotal: " + resultTotal);
257
			log.info("resInt: " + resumptionInt);
258
			if (resumptionInt < resultTotal) {
259
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
260
			} else
261
				nextQuery = "";
262
                        
263
                        log.debug("nextQueryUrl: " + nextQuery);
264
			return nextQuery;
265

    
266
		} catch(Exception e) {
267
			log.error(e);
268
			throw new IllegalStateException("collection failed: " + e.getMessage());
269
		}
270
	}
271
        
272
        /**
273
         * convert in JSON-KeyName 'whitespace(s)' to '_' and '/' to '_', '(' and ')' to ''
274
         * check W3C XML syntax: https://www.w3.org/TR/2006/REC-xml11-20060816/#sec-starttags for valid tag names
275
         * and work-around for the JSON to XML converting of org.json.XML-package.
276
         * 
277
         * @param jsonInput
278
         * @return convertedJsonKeynameOutput
279
         */
280
        private String syntaxConvertJsonKeyNamens(String jsonInput) {
281

    
282
            log.trace("before convertJsonKeyNames: " + jsonInput);
283
            // replace ' 's in JSON Namens with '_'
284
            while (jsonInput.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")) {
285
                jsonInput = jsonInput.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
286
            }
287

    
288
            // replace forward-slash (sign '/' ) in JSON Names with '_'
289
            while (jsonInput.matches(".*\"([^\"]*)/([^\"]*)\":.*")) {
290
                jsonInput = jsonInput.replaceAll("\"([^\"]*)/([^\"]*)\":", "\"$1_$2\":");
291
            }
292

    
293
            // replace '(' in JSON Names with ''
294
            while (jsonInput.matches(".*\"([^\"]*)[(]([^\"]*)\":.*")) {
295
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[(]([^\"]*)\":", "\"$1$2\":");
296
            }
297
            
298
            // replace ')' in JSON Names with ''
299
            while (jsonInput.matches(".*\"([^\"]*)[)]([^\"]*)\":.*")) {
300
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[)]([^\"]*)\":", "\"$1$2\":");
301
            }
302

    
303
            // replace startNumbers in JSON Keynames with 'n_'
304
            while (jsonInput.matches(".*\"([^\"][0-9])([^\"]*)\":.*")) {
305
                jsonInput = jsonInput.replaceAll("\"([^\"][0-9])([^\"]*)\":", "\"n_$1$2\":");
306
            }
307
            
308
            // replace ':' between number like '2018-08-28T11:05:00Z' in JSON keynames with ''
309
            while (jsonInput.matches(".*\"([^\"]*[0-9]):([0-9][^\"]*)\":.*")) {
310
                jsonInput = jsonInput.replaceAll("\"([^\"]*[0-9]):([0-9][^\"]*)\":", "\"$1$2\":");
311
            }            
312

    
313
            // replace ',' in JSON Keynames with '.'
314
            while (jsonInput.matches(".*\"([^\"]*),([^\"]*)\":.*")) {
315
                jsonInput = jsonInput.replaceAll("\"([^\"]*),([^\"]*)\":", "\"$1.$2\":");
316
            }
317
            
318
            // replace '=' in JSON Keynames with '-'
319
            while (jsonInput.matches(".*\"([^\"]*)=([^\"]*)\":.*")) {
320
                jsonInput = jsonInput.replaceAll("\"([^\"]*)=([^\"]*)\":", "\"$1-$2\":");
321
            }
322
            
323
            log.trace("after syntaxConvertJsonKeyNames: " + jsonInput);
324
            return jsonInput;
325
        }
326

    
327
}
(2-2/2)