Project

General

Profile

1 50066 jochen.sch
/**
2
 *
3 52983 andreas.cz
 *
4
 * log.debug(...) equal to  log.trace(...) in the application-logs
5 50066 jochen.sch
 */
6
package eu.dnetlib.data.collector.plugins.rest;
7
8
import java.io.InputStream;
9 50582 jochen.sch
import java.io.StringWriter;
10 50066 jochen.sch
import java.net.URL;
11
import java.util.Iterator;
12
import java.util.Queue;
13 50582 jochen.sch
import java.util.concurrent.PriorityBlockingQueue;
14 50066 jochen.sch
15
import javax.xml.transform.OutputKeys;
16
import javax.xml.transform.Transformer;
17
import javax.xml.transform.TransformerConfigurationException;
18
import javax.xml.transform.TransformerFactory;
19 50582 jochen.sch
import javax.xml.transform.dom.DOMSource;
20
import javax.xml.transform.stream.StreamResult;
21 50066 jochen.sch
import javax.xml.xpath.XPath;
22
import javax.xml.xpath.XPathConstants;
23
import javax.xml.xpath.XPathExpression;
24
import javax.xml.xpath.XPathExpressionException;
25
import javax.xml.xpath.XPathFactory;
26
27
import org.apache.commons.io.IOUtils;
28 50662 alessia.ba
import org.apache.commons.lang3.StringUtils;
29 50582 jochen.sch
import org.apache.commons.logging.Log;
30
import org.apache.commons.logging.LogFactory;
31 50066 jochen.sch
import org.w3c.dom.Node;
32 50582 jochen.sch
import org.w3c.dom.NodeList;
33 50066 jochen.sch
import org.xml.sax.InputSource;
34
35 50582 jochen.sch
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
36
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
37
38 50066 jochen.sch
/**
39 52970 andreas.cz
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
40
 * @date 2018-08-06
41 50066 jochen.sch
 *
42
 */
43
public class RestIterator implements Iterator<String> {
44
45 52971 andreas.cz
    // TODO: clean up the comments of replaced source code
46 50582 jochen.sch
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
47
48 50066 jochen.sch
	private static final String wrapName = "recordWrap";
49
	private String baseUrl;
50
	private String resumptionType;
51
	private String resumptionParam;
52
	private String resultFormatValue;
53
	private String queryParams;
54 52970 andreas.cz
	private int resultSizeValue;
55 50066 jochen.sch
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
56
	private int resultTotal = -1;
57
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
58
	private InputStream resultStream;
59
	private Transformer transformer;
60
	private XPath xpath;
61 50582 jochen.sch
	private String query;
62 50066 jochen.sch
	private XPathExpression xprResultTotalPath;
63
	private XPathExpression xprResumptionPath;
64 50582 jochen.sch
	private XPathExpression xprEntity;
65 50066 jochen.sch
	private String queryFormat;
66
	private String querySize;
67 50582 jochen.sch
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
68 52983 andreas.cz
        private int discoverResultSize = 0;
69
70 50066 jochen.sch
	public RestIterator(
71
			final String baseUrl,
72
			final String resumptionType,
73
			final String resumptionParam,
74
			final String resumptionXpath,
75
			final String resultTotalXpath,
76
			final String resultFormatParam,
77
			final String resultFormatValue,
78
			final String resultSizeParam,
79 52971 andreas.cz
                        final String resultSizeValueStr,
80 50582 jochen.sch
			final String queryParams,
81
			final String entityXpath
82 50066 jochen.sch
			) {
83
		this.baseUrl = baseUrl;
84
		this.resumptionType = resumptionType;
85
		this.resumptionParam = resumptionParam;
86
		this.resultFormatValue = resultFormatValue;
87
		this.queryParams = queryParams;
88 52971 andreas.cz
                this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
89
90 50584 claudio.at
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
91 52971 andreas.cz
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
92 50066 jochen.sch
93
		try {
94 50582 jochen.sch
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
95 50584 claudio.at
		} catch(Exception e) {
96
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
97 50066 jochen.sch
		}
98
        initQueue();
99
	}
100
101 50582 jochen.sch
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
102 50066 jochen.sch
		transformer = TransformerFactory.newInstance().newTransformer();
103 52970 andreas.cz
                transformer.setOutputProperty(OutputKeys.INDENT,"yes");
104
                transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
105
		xpath              = XPathFactory.newInstance().newXPath();
106 50066 jochen.sch
		xprResultTotalPath = xpath.compile(resultTotalXpath);
107 52970 andreas.cz
		xprResumptionPath  = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
108
		xprEntity          = xpath.compile(entityXpath);
109 50066 jochen.sch
	}
110
111
	private void initQueue() {
112 50582 jochen.sch
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
113 50066 jochen.sch
	}
114
115
	private void disconnect() {
116
		// TODO close inputstream
117
	}
118
119
	/* (non-Javadoc)
120
	 * @see java.util.Iterator#hasNext()
121
	 */
122
	@Override
123
	public boolean hasNext() {
124 50582 jochen.sch
		if (recordQueue.isEmpty() && query.isEmpty()) {
125 50066 jochen.sch
			disconnect();
126
			return false;
127
		} else {
128
			return true;
129
		}
130
	}
131
132
	/* (non-Javadoc)
133
	 * @see java.util.Iterator#next()
134
	 */
135
	@Override
136
	public String next() {
137 50582 jochen.sch
		synchronized (recordQueue) {
138
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
139
				try {
140 52983 andreas.cz
                                        log.info("get Query: " + query);
141 50582 jochen.sch
					query = downloadPage(query);
142 52983 andreas.cz
                                        log.debug("next query from downloadPage method: " + query);
143 50584 claudio.at
				} catch(CollectorServiceException e) {
144 52983 andreas.cz
                                        log.debug("CollectorPlugin.next()-Exception: " + e);
145 50582 jochen.sch
					throw new RuntimeException(e);
146
				}
147
			}
148
			return recordQueue.poll();
149
		}
150
	}
151
152
153
	/*
154
	 * download page and return nextQuery
155
	 */
156
	private String downloadPage(String query) throws CollectorServiceException{
157 50066 jochen.sch
		String resultJson;
158
		String resultXml = "";
159 50582 jochen.sch
		String nextQuery = "";
160 52979 andreas.cz
                String emptyXml = "<"+wrapName+"></"+wrapName+">";
161
                Node resultNode = null;
162 52982 andreas.cz
                NodeList nodeList = null;
163 52983 andreas.cz
                String qUrlArgument = "";
164
                int urlOldResumptionSize = 0;
165 52979 andreas.cz
166 50066 jochen.sch
		try {
167 52983 andreas.cz
                        URL qUrl = new URL(query);
168
169
                        resultStream = qUrl.openStream();
170 52970 andreas.cz
			if("json".equals(resultFormatValue.toLowerCase())){
171 50066 jochen.sch
				resultJson = IOUtils.toString(resultStream,"UTF-8");
172 50584 claudio.at
173
				//TODO move regex definitions as constant fields
174 50066 jochen.sch
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
175 52970 andreas.cz
				resultJson = syntaxConvertJsonKeyNamens(resultJson);
176
//                                while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
177
//					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
178
//				}
179 50066 jochen.sch
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
180
				resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
181 52979 andreas.cz
				log.trace(resultXml);
182 50066 jochen.sch
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
183
			}
184
185 52979 andreas.cz
                        if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
186
                            resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
187
                            nodeList   = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
188 52982 andreas.cz
                            log.debug("nodeList.length: " + nodeList.getLength());
189 52979 andreas.cz
                            for (int i = 0; i < nodeList.getLength(); i++) {
190
                                StringWriter sw = new StringWriter();
191
                                transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
192
                                recordQueue.add(sw.toString());
193
                            }
194 52982 andreas.cz
                        } else { log.info("resultXml is equal with emptyXml"); }
195
196 50066 jochen.sch
			resumptionInt += resultSizeValue;
197 52970 andreas.cz
198 52971 andreas.cz
/*	replaced by switch statement as follow
199
                        if("scan".equals(resumptionType.toLowerCase())) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
200
			if("count".equals(resumptionType.toLowerCase())){ resumptionStr = Integer.toString(resumptionInt); }
201
*/
202 52970 andreas.cz
                        switch(resumptionType.toLowerCase()) {
203 52979 andreas.cz
                            case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI
204 52970 andreas.cz
                                            resumptionStr = xprResumptionPath.evaluate(resultNode);
205
                                            break;
206 52979 andreas.cz
207
                            case "count":   // begin at one step for all records
208 52970 andreas.cz
                                            resumptionStr = Integer.toString(resumptionInt);
209
                                            break;
210 52979 andreas.cz
211
                            case "discover":   // length of results unknown
212 52983 andreas.cz
                                            qUrlArgument = qUrl.getQuery();
213
                                            String[] arrayQUrlArgument = qUrlArgument.split("&");
214
                                            for(String arrayUrlArgStr : arrayQUrlArgument ) {
215
                                                if(arrayUrlArgStr.startsWith(resumptionParam)) {
216
                                                    String[] resumptionKeyValue = arrayUrlArgStr.split("=");
217
                                                    urlOldResumptionSize = Integer.parseInt(resumptionKeyValue[1]);
218
                                                    log.debug("discover OldResumptionSize from Url: " + urlOldResumptionSize);
219
                                                }
220
                                            }
221
222
                                            //
223
                                            if(   ( (emptyXml.toLowerCase()).equals(resultXml.toLowerCase()) )
224
                                               || ( (nodeList != null) && (nodeList.getLength() < resultSizeValue) )
225
                                              ) {
226 52982 andreas.cz
                                                // resumptionStr = "";
227 52983 andreas.cz
                                                if(nodeList != null) { discoverResultSize += nodeList.getLength(); }
228 52982 andreas.cz
                                                resultTotal   = discoverResultSize;
229 52970 andreas.cz
                                            } else {
230
                                                resumptionStr = Integer.toString(resumptionInt);
231
                                                resultTotal   = resumptionInt+1;
232 52982 andreas.cz
                                                if(nodeList != null) { discoverResultSize += nodeList.getLength(); }
233 52970 andreas.cz
                                            }
234 52982 andreas.cz
                                            log.info("discoverResultSize:  " + discoverResultSize);
235 52970 andreas.cz
                                            break;
236 52979 andreas.cz
237
                            default:        // otherwise: abort
238 52982 andreas.cz
                                            // resultTotal = resumptionInt;
239 52971 andreas.cz
                                            break;
240 52970 andreas.cz
                        }
241 52971 andreas.cz
242 50066 jochen.sch
			if (resultTotal == -1) {
243
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
244 52979 andreas.cz
				log.info("resultTotal was -1 is now: " + resultTotal);
245 50066 jochen.sch
			}
246 50584 claudio.at
			log.info("resultTotal: " + resultTotal);
247
			log.info("resInt: " + resumptionInt);
248 50066 jochen.sch
			if (resumptionInt < resultTotal) {
249 50582 jochen.sch
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
250 50584 claudio.at
			} else
251 50582 jochen.sch
				nextQuery = "";
252 52971 andreas.cz
253 52979 andreas.cz
                        log.debug("nextQueryUrl: " + nextQuery);
254 50582 jochen.sch
			return nextQuery;
255 50066 jochen.sch
256 50584 claudio.at
		} catch(Exception e) {
257
			log.error(e);
258
			throw new IllegalStateException("collection failed: " + e.getMessage());
259 50066 jochen.sch
		}
260
	}
261 52970 andreas.cz
262
        /**
263
         * convert in Json-KeyName 'whitespace(s)' to '_' and '/' to '_', '(' and ')' to ''
264
         *
265
         * @param jsonInput
266
         * @return
267
         */
268
        private String syntaxConvertJsonKeyNamens(String jsonInput) {
269 50066 jochen.sch
270 52979 andreas.cz
            log.trace("before convertJsonKeyNames: " + jsonInput);
271 52970 andreas.cz
            // replace ' 's in JSON Namens with '_'
272
            while (jsonInput.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")) {
273
                jsonInput = jsonInput.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
274
            }
275
276
            // replace forward-slash (sign '/' ) in JSON Names with '_'
277
            while (jsonInput.matches(".*\"([^\"]*)/([^\"]*)\":.*")) {
278
                jsonInput = jsonInput.replaceAll("\"([^\"]*)/([^\"]*)\":", "\"$1_$2\":");
279
            }
280
281
            // replace '(' in JSON Names with ''
282
            while (jsonInput.matches(".*\"([^\"]*)[(]([^\"]*)\":.*")) {
283
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[(]([^\"]*)\":", "\"$1$2\":");
284
            }
285
            // replace ')' in JSON Names with ''
286
            while (jsonInput.matches(".*\"([^\"]*)[)]([^\"]*)\":.*")) {
287
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[)]([^\"]*)\":", "\"$1$2\":");
288
            }
289
290 52979 andreas.cz
            log.trace("after syntaxConvertJsonKeyNames: " + jsonInput);
291 52970 andreas.cz
            return jsonInput;
292
        }
293
294 50066 jochen.sch
}