Project

General

Profile

1 50066 jochen.sch
/**
2
 *
3
 */
4
package eu.dnetlib.data.collector.plugins.rest;
5
6
import java.io.InputStream;
7 50582 jochen.sch
import java.io.StringWriter;
8 50066 jochen.sch
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.Queue;
11 50582 jochen.sch
import java.util.concurrent.PriorityBlockingQueue;
12 50066 jochen.sch
13
import javax.xml.transform.OutputKeys;
14
import javax.xml.transform.Transformer;
15
import javax.xml.transform.TransformerConfigurationException;
16
import javax.xml.transform.TransformerFactory;
17 50582 jochen.sch
import javax.xml.transform.dom.DOMSource;
18
import javax.xml.transform.stream.StreamResult;
19 50066 jochen.sch
import javax.xml.xpath.XPath;
20
import javax.xml.xpath.XPathConstants;
21
import javax.xml.xpath.XPathExpression;
22
import javax.xml.xpath.XPathExpressionException;
23
import javax.xml.xpath.XPathFactory;
24
25
import org.apache.commons.io.IOUtils;
26 50662 alessia.ba
import org.apache.commons.lang3.StringUtils;
27 50582 jochen.sch
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29 50066 jochen.sch
import org.w3c.dom.Node;
30 50582 jochen.sch
import org.w3c.dom.NodeList;
31 50066 jochen.sch
import org.xml.sax.InputSource;
32
33 50582 jochen.sch
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
34
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
35
36 50066 jochen.sch
/**
37
 * @author Jochen Schirrwagen, Aenne Loehden
38
 *
39
 */
40
public class RestIterator implements Iterator<String> {
41
42 50582 jochen.sch
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
43
44 50066 jochen.sch
	private static final String wrapName = "recordWrap";
45
	private String baseUrl;
46
	private String resumptionType;
47
	private String resumptionParam;
48
	private String resultFormatValue;
49
	private String queryParams;
50
	private int resultSizeValue = 100;
51
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
52
	private int resultTotal = -1;
53
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
54
	private InputStream resultStream;
55
	private Transformer transformer;
56
	private XPath xpath;
57 50582 jochen.sch
	private String query;
58 50066 jochen.sch
	private XPathExpression xprResultTotalPath;
59
	private XPathExpression xprResumptionPath;
60 50582 jochen.sch
	private XPathExpression xprEntity;
61 50066 jochen.sch
	private String queryFormat;
62
	private String querySize;
63 50582 jochen.sch
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
64 50066 jochen.sch
65
	public RestIterator(
66
			final String baseUrl,
67
			final String resumptionType,
68
			final String resumptionParam,
69
			final String resumptionXpath,
70
			final String resultTotalXpath,
71
			final String resultFormatParam,
72
			final String resultFormatValue,
73
			final String resultSizeParam,
74 50582 jochen.sch
			final String queryParams,
75
			final String entityXpath
76 50066 jochen.sch
			) {
77
		this.baseUrl = baseUrl;
78
		this.resumptionType = resumptionType;
79
		this.resumptionParam = resumptionParam;
80
		this.resultFormatValue = resultFormatValue;
81
		this.queryParams = queryParams;
82
83 50584 claudio.at
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
84
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValue : "";
85 50066 jochen.sch
86
		try {
87 50582 jochen.sch
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
88 50584 claudio.at
		} catch(Exception e) {
89
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
90 50066 jochen.sch
		}
91
        initQueue();
92
	}
93
94 50582 jochen.sch
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
95 50066 jochen.sch
		transformer = TransformerFactory.newInstance().newTransformer();
96
        transformer.setOutputProperty(OutputKeys.INDENT,"yes");
97
        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
98
		xpath = XPathFactory.newInstance().newXPath();
99
		xprResultTotalPath = xpath.compile(resultTotalXpath);
100 50584 claudio.at
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
101 50582 jochen.sch
		xprEntity = xpath.compile(entityXpath);
102 50066 jochen.sch
	}
103
104
	private void initQueue() {
105 50582 jochen.sch
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
106 50066 jochen.sch
	}
107
108
	private void disconnect() {
109
		// TODO close inputstream
110
	}
111
112
	/* (non-Javadoc)
113
	 * @see java.util.Iterator#hasNext()
114
	 */
115
	@Override
116
	public boolean hasNext() {
117 50582 jochen.sch
		if (recordQueue.isEmpty() && query.isEmpty()) {
118 50066 jochen.sch
			disconnect();
119
			return false;
120
		} else {
121
			return true;
122
		}
123
	}
124
125
	/* (non-Javadoc)
126
	 * @see java.util.Iterator#next()
127
	 */
128
	@Override
129
	public String next() {
130 50582 jochen.sch
		synchronized (recordQueue) {
131
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
132
				try {
133
					query = downloadPage(query);
134 50584 claudio.at
				} catch(CollectorServiceException e) {
135 50582 jochen.sch
					throw new RuntimeException(e);
136
				}
137
			}
138
			return recordQueue.poll();
139
		}
140
	}
141
142
143
	/*
144
	 * download page and return nextQuery
145
	 */
146
	private String downloadPage(String query) throws CollectorServiceException{
147 50066 jochen.sch
		String resultJson;
148
		String resultXml = "";
149 50582 jochen.sch
		String nextQuery = "";
150 50066 jochen.sch
		try {
151 50582 jochen.sch
            resultStream = new URL(query).openStream();
152 50584 claudio.at
			if("json".equals(resultFormatValue)){
153 50066 jochen.sch
				resultJson = IOUtils.toString(resultStream,"UTF-8");
154 50584 claudio.at
155
				//TODO move regex definitions as constant fields
156 50066 jochen.sch
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
157
				while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
158
					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
159
				}
160
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
161
				resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
162 50584 claudio.at
//				log.info(resultXml);
163 50066 jochen.sch
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
164
			}
165
166 50584 claudio.at
			Node resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
167 50582 jochen.sch
			NodeList nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
168
169
			for (int i = 0; i < nodeList.getLength(); i++) {
170
				StringWriter sw = new StringWriter();
171 50584 claudio.at
				transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
172 50582 jochen.sch
				recordQueue.add(sw.toString());
173
			}
174
175 50066 jochen.sch
			resumptionInt += resultSizeValue;
176 50584 claudio.at
			if("scan".equals(resumptionType)) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
177
			if("count".equals(resumptionType)){ resumptionStr = Integer.toString(resumptionInt); }
178 50066 jochen.sch
179
			if (resultTotal == -1) {
180
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
181 50584 claudio.at
				log.info("resultTotal: " + resultTotal);
182 50066 jochen.sch
			}
183 50584 claudio.at
			log.info("resultTotal: " + resultTotal);
184
			log.info("resInt: " + resumptionInt);
185 50066 jochen.sch
			if (resumptionInt < resultTotal) {
186 50582 jochen.sch
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
187 50584 claudio.at
			} else
188 50582 jochen.sch
				nextQuery = "";
189
			return nextQuery;
190 50066 jochen.sch
191 50584 claudio.at
		} catch(Exception e) {
192
			log.error(e);
193
			throw new IllegalStateException("collection failed: " + e.getMessage());
194 50066 jochen.sch
		}
195
	}
196
197
}