Project

General

Profile

1 50066 jochen.sch
/**
2
 *
3
 */
4
package eu.dnetlib.data.collector.plugins.rest;
5
6
import java.io.InputStream;
7 50582 jochen.sch
import java.io.StringWriter;
8 50066 jochen.sch
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.Queue;
11 50582 jochen.sch
import java.util.concurrent.PriorityBlockingQueue;
12 50066 jochen.sch
13
import javax.xml.transform.OutputKeys;
14
import javax.xml.transform.Transformer;
15
import javax.xml.transform.TransformerConfigurationException;
16
import javax.xml.transform.TransformerFactory;
17 50582 jochen.sch
import javax.xml.transform.dom.DOMSource;
18
import javax.xml.transform.stream.StreamResult;
19 50066 jochen.sch
import javax.xml.xpath.XPath;
20
import javax.xml.xpath.XPathConstants;
21
import javax.xml.xpath.XPathExpression;
22
import javax.xml.xpath.XPathExpressionException;
23
import javax.xml.xpath.XPathFactory;
24
25
import org.apache.commons.io.IOUtils;
26 50662 alessia.ba
import org.apache.commons.lang3.StringUtils;
27 50582 jochen.sch
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29 50066 jochen.sch
import org.w3c.dom.Node;
30 50582 jochen.sch
import org.w3c.dom.NodeList;
31 50066 jochen.sch
import org.xml.sax.InputSource;
32
33 50582 jochen.sch
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
34
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
35
36 50066 jochen.sch
/**
37 52970 andreas.cz
 * @author Jochen Schirrwagen, Aenne Loehden, Andreas Czerniak
38
 * @date 2018-08-06
39 50066 jochen.sch
 *
40
 */
41
public class RestIterator implements Iterator<String> {
42
43 52971 andreas.cz
    // TODO: clean up the comments of replaced source code
44 50582 jochen.sch
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
45
46 50066 jochen.sch
	private static final String wrapName = "recordWrap";
47
	private String baseUrl;
48
	private String resumptionType;
49
	private String resumptionParam;
50
	private String resultFormatValue;
51
	private String queryParams;
52 52970 andreas.cz
	private int resultSizeValue;
53 50066 jochen.sch
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
54
	private int resultTotal = -1;
55
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
56
	private InputStream resultStream;
57
	private Transformer transformer;
58
	private XPath xpath;
59 50582 jochen.sch
	private String query;
60 50066 jochen.sch
	private XPathExpression xprResultTotalPath;
61
	private XPathExpression xprResumptionPath;
62 50582 jochen.sch
	private XPathExpression xprEntity;
63 50066 jochen.sch
	private String queryFormat;
64
	private String querySize;
65 50582 jochen.sch
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
66 50066 jochen.sch
67
	public RestIterator(
68
			final String baseUrl,
69
			final String resumptionType,
70
			final String resumptionParam,
71
			final String resumptionXpath,
72
			final String resultTotalXpath,
73
			final String resultFormatParam,
74
			final String resultFormatValue,
75
			final String resultSizeParam,
76 52971 andreas.cz
                        final String resultSizeValueStr,
77 50582 jochen.sch
			final String queryParams,
78
			final String entityXpath
79 50066 jochen.sch
			) {
80
		this.baseUrl = baseUrl;
81
		this.resumptionType = resumptionType;
82
		this.resumptionParam = resumptionParam;
83
		this.resultFormatValue = resultFormatValue;
84
		this.queryParams = queryParams;
85 52971 andreas.cz
                this.resultSizeValue = Integer.valueOf(resultSizeValueStr);
86
87 50584 claudio.at
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
88 52971 andreas.cz
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValueStr : "";
89 50066 jochen.sch
90
		try {
91 50582 jochen.sch
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
92 50584 claudio.at
		} catch(Exception e) {
93
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
94 50066 jochen.sch
		}
95
        initQueue();
96
	}
97
98 50582 jochen.sch
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
99 50066 jochen.sch
		transformer = TransformerFactory.newInstance().newTransformer();
100 52970 andreas.cz
                transformer.setOutputProperty(OutputKeys.INDENT,"yes");
101
                transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
102
		xpath              = XPathFactory.newInstance().newXPath();
103 50066 jochen.sch
		xprResultTotalPath = xpath.compile(resultTotalXpath);
104 52970 andreas.cz
		xprResumptionPath  = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
105
		xprEntity          = xpath.compile(entityXpath);
106 50066 jochen.sch
	}
107
108
	private void initQueue() {
109 50582 jochen.sch
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
110 50066 jochen.sch
	}
111
112
	private void disconnect() {
113
		// TODO close inputstream
114
	}
115
116
	/* (non-Javadoc)
117
	 * @see java.util.Iterator#hasNext()
118
	 */
119
	@Override
120
	public boolean hasNext() {
121 50582 jochen.sch
		if (recordQueue.isEmpty() && query.isEmpty()) {
122 50066 jochen.sch
			disconnect();
123
			return false;
124
		} else {
125
			return true;
126
		}
127
	}
128
129
	/* (non-Javadoc)
130
	 * @see java.util.Iterator#next()
131
	 */
132
	@Override
133
	public String next() {
134 50582 jochen.sch
		synchronized (recordQueue) {
135
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
136
				try {
137
					query = downloadPage(query);
138 50584 claudio.at
				} catch(CollectorServiceException e) {
139 50582 jochen.sch
					throw new RuntimeException(e);
140
				}
141
			}
142
			return recordQueue.poll();
143
		}
144
	}
145
146
147
	/*
148
	 * download page and return nextQuery
149
	 */
150
	private String downloadPage(String query) throws CollectorServiceException{
151 50066 jochen.sch
		String resultJson;
152
		String resultXml = "";
153 50582 jochen.sch
		String nextQuery = "";
154 52979 andreas.cz
                String emptyXml = "<"+wrapName+"></"+wrapName+">";
155
                Node resultNode = null;
156 52982 andreas.cz
                NodeList nodeList = null;
157
                int discoverResultSize = 0;
158 52979 andreas.cz
159 50066 jochen.sch
		try {
160 52971 andreas.cz
                        resultStream = new URL(query).openStream();
161 52970 andreas.cz
			if("json".equals(resultFormatValue.toLowerCase())){
162 50066 jochen.sch
				resultJson = IOUtils.toString(resultStream,"UTF-8");
163 50584 claudio.at
164
				//TODO move regex definitions as constant fields
165 50066 jochen.sch
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
166 52970 andreas.cz
				resultJson = syntaxConvertJsonKeyNamens(resultJson);
167
//                                while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
168
//					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
169
//				}
170 50066 jochen.sch
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
171
				resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
172 52979 andreas.cz
				log.trace(resultXml);
173 50066 jochen.sch
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
174
			}
175
176 52979 andreas.cz
                        if (!(emptyXml.toLowerCase()).equals(resultXml.toLowerCase())) {
177
                            resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
178
                            nodeList   = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
179 52982 andreas.cz
                            log.debug("nodeList.length: " + nodeList.getLength());
180 52979 andreas.cz
                            for (int i = 0; i < nodeList.getLength(); i++) {
181
                                StringWriter sw = new StringWriter();
182
                                transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
183
                                recordQueue.add(sw.toString());
184
                            }
185 52982 andreas.cz
                        } else { log.info("resultXml is equal with emptyXml"); }
186
187 50066 jochen.sch
			resumptionInt += resultSizeValue;
188 52970 andreas.cz
189 52971 andreas.cz
/*	replaced by switch statement as follow
190
                        if("scan".equals(resumptionType.toLowerCase())) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
191
			if("count".equals(resumptionType.toLowerCase())){ resumptionStr = Integer.toString(resumptionInt); }
192
*/
193 52970 andreas.cz
                        switch(resumptionType.toLowerCase()) {
194 52979 andreas.cz
                            case "scan":    // read of resumptionToken , evaluate next results, e.g. OAI
195 52970 andreas.cz
                                            resumptionStr = xprResumptionPath.evaluate(resultNode);
196
                                            break;
197 52979 andreas.cz
198
                            case "count":   // begin at one step for all records
199 52970 andreas.cz
                                            resumptionStr = Integer.toString(resumptionInt);
200
                                            break;
201 52979 andreas.cz
202
                            case "discover":   // length of results unknown
203 52970 andreas.cz
                                            if( (emptyXml.toLowerCase()).equals(resultXml.toLowerCase()) ) {
204 52982 andreas.cz
                                                // resumptionStr = "";
205
                                                resultTotal   = discoverResultSize;
206 52970 andreas.cz
                                            } else {
207
                                                resumptionStr = Integer.toString(resumptionInt);
208
                                                resultTotal   = resumptionInt+1;
209 52982 andreas.cz
                                                if(nodeList != null) { discoverResultSize += nodeList.getLength(); }
210 52970 andreas.cz
                                            }
211 52982 andreas.cz
                                            log.info("discoverResultSize:  " + discoverResultSize);
212 52970 andreas.cz
                                            break;
213 52979 andreas.cz
214
                            default:        // otherwise: abort
215 52982 andreas.cz
                                            // resultTotal = resumptionInt;
216 52971 andreas.cz
                                            break;
217 52970 andreas.cz
                        }
218 52971 andreas.cz
219 50066 jochen.sch
			if (resultTotal == -1) {
220
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
221 52979 andreas.cz
				log.info("resultTotal was -1 is now: " + resultTotal);
222 50066 jochen.sch
			}
223 50584 claudio.at
			log.info("resultTotal: " + resultTotal);
224
			log.info("resInt: " + resumptionInt);
225 50066 jochen.sch
			if (resumptionInt < resultTotal) {
226 50582 jochen.sch
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
227 50584 claudio.at
			} else
228 50582 jochen.sch
				nextQuery = "";
229 52971 andreas.cz
230 52979 andreas.cz
                        log.debug("nextQueryUrl: " + nextQuery);
231 50582 jochen.sch
			return nextQuery;
232 50066 jochen.sch
233 50584 claudio.at
		} catch(Exception e) {
234
			log.error(e);
235
			throw new IllegalStateException("collection failed: " + e.getMessage());
236 50066 jochen.sch
		}
237
	}
238 52970 andreas.cz
239
        /**
240
         * convert in Json-KeyName 'whitespace(s)' to '_' and '/' to '_', '(' and ')' to ''
241
         *
242
         * @param jsonInput
243
         * @return
244
         */
245
        private String syntaxConvertJsonKeyNamens(String jsonInput) {
246 50066 jochen.sch
247 52979 andreas.cz
            log.trace("before convertJsonKeyNames: " + jsonInput);
248 52970 andreas.cz
            // replace ' 's in JSON Namens with '_'
249
            while (jsonInput.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")) {
250
                jsonInput = jsonInput.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
251
            }
252
253
            // replace forward-slash (sign '/' ) in JSON Names with '_'
254
            while (jsonInput.matches(".*\"([^\"]*)/([^\"]*)\":.*")) {
255
                jsonInput = jsonInput.replaceAll("\"([^\"]*)/([^\"]*)\":", "\"$1_$2\":");
256
            }
257
258
            // replace '(' in JSON Names with ''
259
            while (jsonInput.matches(".*\"([^\"]*)[(]([^\"]*)\":.*")) {
260
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[(]([^\"]*)\":", "\"$1$2\":");
261
            }
262
            // replace ')' in JSON Names with ''
263
            while (jsonInput.matches(".*\"([^\"]*)[)]([^\"]*)\":.*")) {
264
                jsonInput = jsonInput.replaceAll("\"([^\"]*)[)]([^\"]*)\":", "\"$1$2\":");
265
            }
266
267 52979 andreas.cz
            log.trace("after syntaxConvertJsonKeyNames: " + jsonInput);
268 52970 andreas.cz
            return jsonInput;
269
        }
270
271 50066 jochen.sch
}