Project

General

Profile

1
/**
2
 * 
3
 */
4
package eu.dnetlib.data.collector.plugins.rest;
5

    
6
import java.io.InputStream;
7
import java.io.StringWriter;
8
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.Queue;
11
import java.util.concurrent.PriorityBlockingQueue;
12

    
13
import javax.xml.transform.OutputKeys;
14
import javax.xml.transform.Transformer;
15
import javax.xml.transform.TransformerConfigurationException;
16
import javax.xml.transform.TransformerFactory;
17
import javax.xml.transform.dom.DOMSource;
18
import javax.xml.transform.stream.StreamResult;
19
import javax.xml.xpath.XPath;
20
import javax.xml.xpath.XPathConstants;
21
import javax.xml.xpath.XPathExpression;
22
import javax.xml.xpath.XPathExpressionException;
23
import javax.xml.xpath.XPathFactory;
24

    
25
import org.apache.commons.io.IOUtils;
26
import org.apache.commons.lang3.StringUtils;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.w3c.dom.Node;
30
import org.w3c.dom.NodeList;
31
import org.xml.sax.InputSource;
32

    
33
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
34
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
35

    
36
/**
37
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
38
 * @date 2018-08-06
39
 *
40
 */
41
public class RestIterator implements Iterator<String> {
42

    
43
    // TODO: clean up the comments of replaced source code
44
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
45

    
46
	private static final String wrapName = "recordWrap";
47
	private String baseUrl;
48
	private String resumptionType;
49
	private String resumptionParam;
50
	private String resultFormatValue;
51
	private String queryParams;
52
	private int resultSizeValue;
53
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
54
	private int resultTotal = -1;
55
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
56
	private InputStream resultStream;
57
	private Transformer transformer;
58
	private XPath xpath;
59
	private String query;
60
	private XPathExpression xprResultTotalPath;
61
	private XPathExpression xprResumptionPath;
62
	private XPathExpression xprEntity;
63
	private String queryFormat;
64
	private String querySize;
65
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
66
	
67
	public RestIterator(
68
			final String baseUrl,
69
			final String resumptionType,
70
			final String resumptionParam,
71
			final String resumptionXpath,
72
			final String resultTotalXpath,
73
			final String resultFormatParam,
74
			final String resultFormatValue,
75
			final String resultSizeParam,
76
                        final String resultSizeValueStr,
77
			final String queryParams,
78
			final String entityXpath
79
			) {
80
		this.baseUrl = baseUrl;
81
		this.resumptionType = resumptionType;
82
		this.resumptionParam = resumptionParam;
83
		this.resultFormatValue = resultFormatValue;
84
		this.queryParams = queryParams;
85
                this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
86
                        
87
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
88
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
89

    
90
		try {
91
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
92
		} catch(Exception e) {
93
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
94
		}
95
        initQueue();
96
	}
97
	
98
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
99
		transformer = TransformerFactory.newInstance().newTransformer();
100
                transformer.setOutputProperty(OutputKeys.INDENT,"yes"); 
101
                transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
102
		xpath              = XPathFactory.newInstance().newXPath();
103
		xprResultTotalPath = xpath.compile(resultTotalXpath);
104
		xprResumptionPath  = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
105
		xprEntity          = xpath.compile(entityXpath);
106
	}
107
	
108
	private void initQueue() {
109
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
110
	}
111
	
112
	private void disconnect() {
113
		// TODO close inputstream
114
	}
115
	
116
	/* (non-Javadoc)
117
	 * @see java.util.Iterator#hasNext()
118
	 */
119
	@Override
120
	public boolean hasNext() {
121
		if (recordQueue.isEmpty() && query.isEmpty()) {
122
			disconnect();
123
			return false;
124
		} else {
125
			return true;
126
		}
127
	}
128

    
129
	/* (non-Javadoc)
130
	 * @see java.util.Iterator#next()
131
	 */
132
	@Override
133
	public String next() {
134
		synchronized (recordQueue) {
135
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
136
				try {
137
					query = downloadPage(query);
138
				} catch(CollectorServiceException e) {
139
					throw new RuntimeException(e);
140
				}
141
			}
142
			return recordQueue.poll();
143
		}
144
	}
145
	
146
	
147
	/*
148
	 * download page and return nextQuery
149
	 */
150
	private String downloadPage(String query) throws CollectorServiceException{
151
		String resultJson;
152
		String resultXml = "";
153
		String nextQuery = "";
154
                String emptyXml = "<"+wrapName+"></"+wrapName+">";
155
                Node resultNode = null;
156
                NodeList nodeList = null;
157
                int discoverResultSize = 0;
158
                
159
		try {
160
                        resultStream = new URL(query).openStream();
161
			if("json".equals(resultFormatValue.toLowerCase())){				
162
				resultJson = IOUtils.toString(resultStream,"UTF-8");
163

    
164
				//TODO move regex definitions as constant fields
165
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
166
				resultJson = syntaxConvertJsonKeyNamens(resultJson);
167
//                                while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
168
//					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
169
//				}
170
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
171
				resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
172
				log.trace(resultXml);
173
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
174
			}
175
			
176
                        if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
177
                            resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
178
                            nodeList   = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
179
                            log.debug("nodeList.length: " + nodeList.getLength());
180
                            for (int i = 0; i < nodeList.getLength(); i++) {
181
                                StringWriter sw = new StringWriter();
182
                                transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
183
                                recordQueue.add(sw.toString());
184
                            }
185
                        } else { log.info("resultXml is equal with emptyXml"); }	
186
                        
187
			resumptionInt += resultSizeValue;
188
                        
189
/*	replaced by switch statement as follow		
190
                        if("scan".equals(resumptionType.toLowerCase())) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
191
			if("count".equals(resumptionType.toLowerCase())){ resumptionStr = Integer.toString(resumptionInt); }
192
*/
193
                        switch(resumptionType.toLowerCase()) {
194
                            case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI
195
                                            resumptionStr = xprResumptionPath.evaluate(resultNode);
196
                                            break;
197
                                            
198
                            case "count":   // begin at one step for all records
199
                                            resumptionStr = Integer.toString(resumptionInt);
200
                                            break;
201
                                            
202
                            case "discover":   // length of results unknown                                      
203
                                            if( (emptyXml.toLowerCase()).equals(resultXml.toLowerCase()) ) {
204
                                                // resumptionStr = "";
205
                                                resultTotal   = discoverResultSize;
206
                                            } else {
207
                                                resumptionStr = Integer.toString(resumptionInt);
208
                                                resultTotal   = resumptionInt+1;
209
                                                if(nodeList != null) { discoverResultSize += nodeList.getLength(); }
210
                                            }
211
                                            log.info("discoverResultSize:  " + discoverResultSize);
212
                                            break;
213
                                            
214
                            default:        // otherwise: abort
215
                                            // resultTotal = resumptionInt;
216
                                            break;
217
                        }
218

    
219
			if (resultTotal == -1) {
220
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
221
				log.info("resultTotal was -1 is now: " + resultTotal);
222
			}
223
			log.info("resultTotal: " + resultTotal);
224
			log.info("resInt: " + resumptionInt);
225
			if (resumptionInt < resultTotal) {
226
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
227
			} else
228
				nextQuery = "";
229
                        
230
                        log.debug("nextQueryUrl: " + nextQuery);
231
			return nextQuery;
232

    
233
		} catch(Exception e) {
234
			log.error(e);
235
			throw new IllegalStateException("collection failed: " + e.getMessage());
236
		}
237
	}
238
        
239
        /**
240
         * convert in Json-KeyName 'whitespace(s)' to '_' and '/' to '_', '(' and ')' to ''
241
         * 
242
         * @param jsonInput
243
         * @return 
244
         */
245
        private String syntaxConvertJsonKeyNamens(String jsonInput) {
246

    
247
            log.trace("before convertJsonKeyNames: " + jsonInput);
248
            // replace ' 's in JSON Namens with '_'
249
            while (jsonInput.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")) {
250
                jsonInput = jsonInput.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
251
            }
252

    
253
            // replace forward-slash (sign '/' ) in JSON Names with '_'
254
            while (jsonInput.matches(".*\"([^\"]*)/([^\"]*)\":.*")) {
255
                jsonInput = jsonInput.replaceAll("\"([^\"]*)/([^\"]*)\":", "\"$1_$2\":");
256
            }
257

    
258
            // replace '(' in JSON Names with ''
259
            while (jsonInput.matches(".*\"([^\"]*)[(]([^\"]*)\":.*")) {
260
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[(]([^\"]*)\":", "\"$1$2\":");
261
            }
262
            // replace ')' in JSON Names with ''
263
            while (jsonInput.matches(".*\"([^\"]*)[)]([^\"]*)\":.*")) {
264
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[)]([^\"]*)\":", "\"$1$2\":");
265
            }
266

    
267
            log.trace("after syntaxConvertJsonKeyNames: " + jsonInput);
268
            return jsonInput;
269
        }
270

    
271
}
(2-2/2)