Project

General

Profile

1
/**
2
 * 
3
 */
4
package eu.dnetlib.data.collector.plugins.rest;
5

    
6
import java.io.InputStream;
7
import java.io.StringWriter;
8
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.Queue;
11
import java.util.concurrent.PriorityBlockingQueue;
12

    
13
import javax.xml.transform.OutputKeys;
14
import javax.xml.transform.Transformer;
15
import javax.xml.transform.TransformerConfigurationException;
16
import javax.xml.transform.TransformerFactory;
17
import javax.xml.transform.dom.DOMSource;
18
import javax.xml.transform.stream.StreamResult;
19
import javax.xml.xpath.XPath;
20
import javax.xml.xpath.XPathConstants;
21
import javax.xml.xpath.XPathExpression;
22
import javax.xml.xpath.XPathExpressionException;
23
import javax.xml.xpath.XPathFactory;
24

    
25
import org.apache.commons.io.IOUtils;
26
import org.apache.commons.lang3.StringUtils;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.w3c.dom.Node;
30
import org.w3c.dom.NodeList;
31
import org.xml.sax.InputSource;
32

    
33
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
34
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
35

    
36
/**
37
 * @author Jochen Schirrwagen, Aenne Loehden
38
 *
39
 */
40
public class RestIterator implements Iterator<String> {
41

    
42
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
43

    
44
	private static final String wrapName = "recordWrap";
45
	private String baseUrl;
46
	private String resumptionType;
47
	private String resumptionParam;
48
	private String resultFormatValue;
49
	private String queryParams;
50
	private int resultSizeValue = 100;
51
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
52
	private int resultTotal = -1;
53
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
54
	private InputStream resultStream;
55
	private Transformer transformer;
56
	private XPath xpath;
57
	private String query;
58
	private XPathExpression xprResultTotalPath;
59
	private XPathExpression xprResumptionPath;
60
	private XPathExpression xprEntity;
61
	private String queryFormat;
62
	private String querySize;
63
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
64
	
65
	public RestIterator(
66
			final String baseUrl,
67
			final String resumptionType,
68
			final String resumptionParam,
69
			final String resumptionXpath,
70
			final String resultTotalXpath,
71
			final String resultFormatParam,
72
			final String resultFormatValue,
73
			final String resultSizeParam,
74
			final String queryParams,
75
			final String entityXpath
76
			) {
77
		this.baseUrl = baseUrl;
78
		this.resumptionType = resumptionType;
79
		this.resumptionParam = resumptionParam;
80
		this.resultFormatValue = resultFormatValue;
81
		this.queryParams = queryParams;
82
		
83
        queryFormat = StringUtils.isNotBlank(resultFormatParam) ? "&" + resultFormatParam + "=" + resultFormatValue : "";
84
        querySize = StringUtils.isNotBlank(resultSizeParam) ? "&" + resultSizeParam + "=" + resultSizeValue : "";
85

    
86
		try {
87
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
88
		} catch(Exception e) {
89
			throw new IllegalStateException("xml transformation init failed: " + e.getMessage());
90
		}
91
        initQueue();
92
	}
93
	
94
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
95
		transformer = TransformerFactory.newInstance().newTransformer();
96
        transformer.setOutputProperty(OutputKeys.INDENT,"yes"); 
97
        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount","3");
98
		xpath = XPathFactory.newInstance().newXPath();
99
		xprResultTotalPath = xpath.compile(resultTotalXpath);
100
		xprResumptionPath = xpath.compile(StringUtils.isBlank(resumptionXpath) ? "/" : resumptionXpath);
101
		xprEntity = xpath.compile(entityXpath);
102
	}
103
	
104
	private void initQueue() {
105
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
106
	}
107
	
108
	private void disconnect() {
109
		// TODO close inputstream
110
	}
111
	
112
	/* (non-Javadoc)
113
	 * @see java.util.Iterator#hasNext()
114
	 */
115
	@Override
116
	public boolean hasNext() {
117
		if (recordQueue.isEmpty() && query.isEmpty()) {
118
			disconnect();
119
			return false;
120
		} else {
121
			return true;
122
		}
123
	}
124

    
125
	/* (non-Javadoc)
126
	 * @see java.util.Iterator#next()
127
	 */
128
	@Override
129
	public String next() {
130
		synchronized (recordQueue) {
131
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
132
				try {
133
					query = downloadPage(query);
134
				} catch(CollectorServiceException e) {
135
					throw new RuntimeException(e);
136
				}
137
			}
138
			return recordQueue.poll();
139
		}
140
	}
141
	
142
	
143
	/*
144
	 * download page and return nextQuery
145
	 */
146
	private String downloadPage(String query) throws CollectorServiceException{
147
		String resultJson;
148
		String resultXml = "";
149
		String nextQuery = "";
150
		try {
151
            resultStream = new URL(query).openStream();
152
			if("json".equals(resultFormatValue)){				
153
				resultJson = IOUtils.toString(resultStream,"UTF-8");
154

    
155
				//TODO move regex definitions as constant fields
156
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
157
				while(resultJson.matches(".*\"([^\"]*)\\s+([^\"]*)\":.*")){
158
					resultJson = resultJson.replaceAll("\"([^\"]*)\\s+([^\"]*)\":", "\"$1_$2\":");
159
				}
160
				org.json.JSONObject jsonObject = new org.json.JSONObject(resultJson);
161
				resultXml = org.json.XML.toString(jsonObject,wrapName); // wrap xml in single root element
162
//				log.info(resultXml);
163
				resultStream = IOUtils.toInputStream(resultXml,"UTF-8");
164
			}
165
			
166
			Node resultNode = (Node) xpath.evaluate("/", new InputSource(resultStream), XPathConstants.NODE);
167
			NodeList nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
168
			
169
			for (int i = 0; i < nodeList.getLength(); i++) {
170
				StringWriter sw = new StringWriter();
171
				transformer.transform(new DOMSource(nodeList.item(i)), new StreamResult(sw));
172
				recordQueue.add(sw.toString());
173
			}
174
				
175
			resumptionInt += resultSizeValue;
176
			if("scan".equals(resumptionType)) { resumptionStr = xprResumptionPath.evaluate(resultNode);}
177
			if("count".equals(resumptionType)){ resumptionStr = Integer.toString(resumptionInt); }
178

    
179
			if (resultTotal == -1) {
180
				resultTotal = Integer.parseInt(xprResultTotalPath.evaluate(resultNode));
181
				log.info("resultTotal: " + resultTotal);
182
			}
183
			log.info("resultTotal: " + resultTotal);
184
			log.info("resInt: " + resumptionInt);
185
			if (resumptionInt < resultTotal) {
186
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
187
			} else
188
				nextQuery = "";
189
			return nextQuery;
190

    
191
		} catch(Exception e) {
192
			log.error(e);
193
			throw new IllegalStateException("collection failed: " + e.getMessage());
194
		}
195
	}
196

    
197
}
(2-2/2)