Project

General

Profile

1
/**
2
 * 
3
 */
4
package eu.dnetlib.data.collector.plugins.rest;
5

    
6
import java.io.InputStream;
7
import java.io.StringWriter;
8
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.Queue;
11
import java.util.concurrent.PriorityBlockingQueue;
12

    
13
import javax.xml.transform.OutputKeys;
14
import javax.xml.transform.Transformer;
15
import javax.xml.transform.TransformerConfigurationException;
16
import javax.xml.transform.TransformerFactory;
17
import javax.xml.transform.dom.DOMSource;
18
import javax.xml.transform.stream.StreamResult;
19
import javax.xml.xpath.XPath;
20
import javax.xml.xpath.XPathConstants;
21
import javax.xml.xpath.XPathExpression;
22
import javax.xml.xpath.XPathExpressionException;
23
import javax.xml.xpath.XPathFactory;
24

    
25
import org.apache.commons.io.IOUtils;
26
import org.apache.commons.lang3.StringUtils;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.w3c.dom.Node;
30
import org.w3c.dom.NodeList;
31
import org.xml.sax.InputSource;
32

    
33
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
34
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
35

    
36
/**
37
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
38
 * @date 2018-08-06
39
 *
40
 */
41
public class RestIterator implements Iterator<String> {
42

    
43
    // TODO: clean up the comments of replaced source code
44
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
45

    
46
	private static final String wrapName = "recordWrap";
47
	private String baseUrl;
48
	private String resumptionType;
49
	private String resumptionParam;
50
	private String resultFormatValue;
51
	private String queryParams;
52
	private int resultSizeValue;
53
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
54
	private int resultTotal = -1;
55
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
56
	private InputStream resultStream;
57
	private Transformer transformer;
58
	private XPath xpath;
59
	private String query;
60
	private XPathExpression xprResultTotalPath;
61
	private XPathExpression xprResumptionPath;
62
	private XPathExpression xprEntity;
63
	private String queryFormat;
64
	private String querySize;
65
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
66
	
67
	public RestIterator(
68
			final String baseUrl,
69
			final String resumptionType,
70
			final String resumptionParam,
71
			final String resumptionXpath,
72
			final String resultTotalXpath,
73
			final String resultFormatParam,
74
			final String resultFormatValue,
75
			final String resultSizeParam,
76
                        final String resultSizeValueStr,
77
			final String queryParams,
78
			final String entityXpath
79
			) {
80
		this.baseUrl = baseUrl;
81
		this.resumptionType = resumptionType;
82
		this.resumptionParam = resumptionParam;
83
		this.resultFormatValue = resultFormatValue;
84
		this.queryParams = queryParams;
85
                this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
86
                        
87
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
88
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
89

    
90
		try {
91
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
92
		} catch(Exception e) {
93
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
94
		}
95
        initQueue();
96
	}
97
	
98
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
99
		transformer = TransformerFactory.newInstance().newTransformer();
100
                transformer.setOutputProperty(OutputKeys.INDENT,"yes"); 
101
                transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
102
		xpath              = XPathFactory.newInstance().newXPath();
103
		xprResultTotalPath = xpath.compile(resultTotalXpath);
104
		xprResumptionPath  = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
105
		xprEntity          = xpath.compile(entityXpath);
106
	}
107
	
108
	private void initQueue() {
109
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
110
	}
111
	
112
	private void disconnect() {
113
		// TODO close inputstream
114
	}
115
	
116
	/* (non-Javadoc)
117
	 * @see java.util.Iterator#hasNext()
118
	 */
119
	@Override
120
	public boolean hasNext() {
121
		if (recordQueue.isEmpty() && query.isEmpty()) {
122
			disconnect();
123
			return false;
124
		} else {
125
			return true;
126
		}
127
	}
128

    
129
	/* (non-Javadoc)
130
	 * @see java.util.Iterator#next()
131
	 */
132
	@Override
133
	public String next() {
134
		synchronized (recordQueue) {
135
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
136
				try {
137
					query = downloadPage(query);
138
				} catch(CollectorServiceException e) {
139
					throw new RuntimeException(e);
140
				}
141
			}
142
			return recordQueue.poll();
143
		}
144
	}
145
	
146
	
147
	/*
148
	 * download page and return nextQuery
149
	 */
150
	private String downloadPage(String query) throws CollectorServiceException{
151
		String resultJson;
152
		String resultXml = "";
153
		String nextQuery = "";
154
                String emptyXml = "<"+wrapName+"></"+wrapName+">";
155
                Node resultNode = null;
156
                NodeList nodeList;
157
                
158
		try {
159
                        resultStream = new URL(query).openStream();
160
			if("json".equals(resultFormatValue.toLowerCase())){				
161
				resultJson = IOUtils.toString(resultStream,"UTF-8");
162

    
163
				//TODO move regex definitions as constant fields
164
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
165
				resultJson = syntaxConvertJsonKeyNamens(resultJson);
166
//                                while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
167
//					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
168
//				}
169
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
170
				resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
171
				log.trace(resultXml);
172
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
173
			}
174
			
175
                        if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
176
                            resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
177
                            nodeList   = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
178
                            log.debug("nodeList length: " + nodeList.getLength());
179
                            for (int i = 0; i < nodeList.getLength(); i++) {
180
                                StringWriter sw = new StringWriter();
181
                                transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
182
                                recordQueue.add(sw.toString());
183
                            }
184
                        }	
185
			resumptionInt += resultSizeValue;
186
                        
187
/*	replaced by switch statement as follow		
188
                        if("scan".equals(resumptionType.toLowerCase())) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
189
			if("count".equals(resumptionType.toLowerCase())){ resumptionStr = Integer.toString(resumptionInt); }
190
*/
191
                        switch(resumptionType.toLowerCase()) {
192
                            case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI
193
                                            resumptionStr = xprResumptionPath.evaluate(resultNode);
194
                                            break;
195
                                            
196
                            case "count":   // begin at one step for all records
197
                                            resumptionStr = Integer.toString(resumptionInt);
198
                                            break;
199
                                            
200
                            case "discover":   // length of results unknown                                      
201
                                            if( (emptyXml.toLowerCase()).equals(resultXml.toLowerCase()) ) {
202
                                                resumptionStr = "";
203
                                                resultTotal   = resumptionInt;
204
                                            } else {
205
                                                resumptionStr = Integer.toString(resumptionInt);
206
                                                resultTotal   = resumptionInt+1;
207
                                            }
208
                                            break;
209
                                            
210
                            default:        // otherwise: abort
211
                                            resultTotal = resumptionInt;
212
                                            break;
213
                        }
214

    
215
			if (resultTotal == -1) {
216
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
217
				log.info("resultTotal was -1 is now: " + resultTotal);
218
			}
219
			log.info("resultTotal: " + resultTotal);
220
			log.info("resInt: " + resumptionInt);
221
			if (resumptionInt < resultTotal) {
222
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
223
			} else
224
				nextQuery = "";
225
                        
226
                        log.debug("nextQueryUrl: " + nextQuery);
227
			return nextQuery;
228

    
229
		} catch(Exception e) {
230
			log.error(e);
231
			throw new IllegalStateException("collection failed: " + e.getMessage());
232
		}
233
	}
234
        
235
        /**
236
         * convert in Json-KeyName 'whitespace(s)' to '_' and '/' to '_', '(' and ')' to ''
237
         * 
238
         * @param jsonInput
239
         * @return 
240
         */
241
        private String syntaxConvertJsonKeyNamens(String jsonInput) {
242

    
243
            log.trace("before convertJsonKeyNames: " + jsonInput);
244
            // replace ' 's in JSON Namens with '_'
245
            while (jsonInput.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")) {
246
                jsonInput = jsonInput.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
247
            }
248

    
249
            // replace forward-slash (sign '/' ) in JSON Names with '_'
250
            while (jsonInput.matches(".*\"([^\"]*)/([^\"]*)\":.*")) {
251
                jsonInput = jsonInput.replaceAll("\"([^\"]*)/([^\"]*)\":", "\"$1_$2\":");
252
            }
253

    
254
            // replace '(' in JSON Names with ''
255
            while (jsonInput.matches(".*\"([^\"]*)[(]([^\"]*)\":.*")) {
256
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[(]([^\"]*)\":", "\"$1$2\":");
257
            }
258
            // replace ')' in JSON Names with ''
259
            while (jsonInput.matches(".*\"([^\"]*)[)]([^\"]*)\":.*")) {
260
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[)]([^\"]*)\":", "\"$1$2\":");
261
            }
262

    
263
            log.trace("after syntaxConvertJsonKeyNames: " + jsonInput);
264
            return jsonInput;
265
        }
266

    
267
}
(2-2/2)