Project

General

Profile

« Previous | Next » 

Revision 50582

fixed 'next' method of the iterator class and added new field entityPath

View differences:

RestIterator.java
4 4
package eu.dnetlib.data.collector.plugins.rest;
5 5

  
6 6
import java.io.InputStream;
7
import java.io.StringWriter;
7 8
import java.net.URL;
8 9
import java.util.Iterator;
9
import java.util.LinkedList;
10 10
import java.util.Queue;
11
import java.util.concurrent.PriorityBlockingQueue;
11 12

  
12 13
import javax.xml.transform.OutputKeys;
13 14
import javax.xml.transform.Transformer;
14 15
import javax.xml.transform.TransformerConfigurationException;
15 16
import javax.xml.transform.TransformerFactory;
17
import javax.xml.transform.dom.DOMSource;
18
import javax.xml.transform.stream.StreamResult;
16 19
import javax.xml.xpath.XPath;
17 20
import javax.xml.xpath.XPathConstants;
18 21
import javax.xml.xpath.XPathExpression;
......
20 23
import javax.xml.xpath.XPathFactory;
21 24

  
22 25
import org.apache.commons.io.IOUtils;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
23 28
import org.w3c.dom.Node;
29
import org.w3c.dom.NodeList;
24 30
import org.xml.sax.InputSource;
25 31

  
32
import eu.dnetlib.data.collector.plugins.oai.OaiIterator;
33
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
34

  
26 35
/**
27 36
 * @author Jochen Schirrwagen, Aenne Loehden
28 37
 *
29 38
 */
30 39
public class RestIterator implements Iterator<String> {
31 40

  
41
	private static final Log log = LogFactory.getLog(OaiIterator.class); // NOPMD by marko on 11/24/08 5:02 PM
42

  
32 43
	private static final String wrapName = "recordWrap";
33 44
	private String baseUrl;
34 45
	private String resumptionType;
......
36 47
	private String resultFormatValue;
37 48
	private String queryParams;
38 49
	private int resultSizeValue = 100;
39
	private Queue<String> queue;
40 50
	private int resumptionInt = 0;			// integer resumption token (first record to harvest)
41 51
	private int resultTotal = -1;
42 52
	private String resumptionStr = Integer.toString(resumptionInt);  // string resumption token (first record to harvest or token scanned from results)
43 53
	private InputStream resultStream;
44 54
	private Transformer transformer;
45 55
	private XPath xpath;
56
	private String query;
46 57
	private XPathExpression xprResultTotalPath;
47 58
	private XPathExpression xprResumptionPath;
59
	private XPathExpression xprEntity;
48 60
	private String queryFormat;
49 61
	private String querySize;
62
	private Queue<String> recordQueue = new PriorityBlockingQueue<String>();
50 63
	
51 64
	/*
52 65
	 * 
......
60 73
			final String resultFormatParam,
61 74
			final String resultFormatValue,
62 75
			final String resultSizeParam,
63
			final String queryParams
76
			final String queryParams,
77
			final String entityXpath
64 78
			) {
65 79
		this.baseUrl = baseUrl;
66 80
		this.resumptionType = resumptionType;
......
72 86
        querySize = (resultSizeParam!="")? "&" + resultSizeParam + "=" + resultSizeValue : "";
73 87

  
74 88
		try {
75
			initXmlTransformation(resultTotalXpath, resumptionXpath);
89
			initXmlTransformation(resultTotalXpath, resumptionXpath, entityXpath);
76 90
		}catch(Exception exp) {
77 91
			throw new IllegalStateException("xml transformation init failed: " + exp.getMessage());
78 92
		}
79 93
        initQueue();
80
        updateQueue();
81 94
	}
82 95
	
83
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath) throws TransformerConfigurationException, XPathExpressionException{
96
	private void initXmlTransformation(String resultTotalXpath, String resumptionXpath, String entityXpath) throws TransformerConfigurationException, XPathExpressionException{
84 97
		String resumpXpath = (resumptionXpath=="") ? "/" : resumptionXpath;
85 98

  
86 99
		transformer = TransformerFactory.newInstance().newTransformer();
......
89 102
		xpath = XPathFactory.newInstance().newXPath();
90 103
		xprResultTotalPath = xpath.compile(resultTotalXpath);
91 104
		xprResumptionPath = xpath.compile(resumpXpath);
105
		xprEntity = xpath.compile(entityXpath);
92 106
	}
93 107
	
94 108
	private void initQueue() {
95
		queue = new LinkedList<String>();
109
		query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
96 110
	}
97 111
	
98 112
	private void disconnect() {
99 113
		// TODO close inputstream
100 114
	}
101 115
	
102
	private void updateQueue() {
103
        String query = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
104
        System.out.println("query: " + query);
105
        queue.add(query);
106
	}
107
	
108 116
	/* (non-Javadoc)
109 117
	 * @see java.util.Iterator#hasNext()
110 118
	 */
111 119
	@Override
112 120
	public boolean hasNext() {
113
		if (queue.isEmpty()) {
121
		if (recordQueue.isEmpty() && query.isEmpty()) {
114 122
			disconnect();
115 123
			return false;
116 124
		} else {
......
124 132
	@Override
125 133
	public String next() {
126 134
		// TODO Auto-generated method stub
127
		String nextQuery = queue.remove();
135
		
136
		synchronized (recordQueue) {
137
			while (recordQueue.isEmpty() && !query.isEmpty() ) {
138
				try {
139
					query = downloadPage(query);
140
				}catch(CollectorServiceException e) {
141
					throw new RuntimeException(e);
142
				}
143
			}
144
			return recordQueue.poll();
145
		}
146
	}
147
	
148
	
149
	/*
150
	 * download page and return nextQuery
151
	 */
152
	private String downloadPage(String query) throws CollectorServiceException{
128 153
		String resultJson;
129 154
		String resultXml = "";
155
		String nextQuery = "";
130 156
		try {
131
            resultStream = new URL(nextQuery).openStream();
157
            resultStream = new URL(query).openStream();
132 158
			if(resultFormatValue == "json"){				
133 159
				resultJson = IOUtils.toString(resultStream,"UTF-8");
134 160
				// pre-clean json - rid spaces of element names (misinterpreted as elements with attributes in xml)
......
144 170
			InputSource inSource = new InputSource(resultStream);
145 171

  
146 172
			Node resultNode = (Node) xpath.evaluate("/", inSource, XPathConstants.NODE);
173

  
174
			NodeList nodeList = (NodeList) xprEntity.evaluate(resultNode, XPathConstants.NODESET);
175
			
176
			for (int i = 0; i < nodeList.getLength(); i++) {
177
				Node n = nodeList.item(i);
178
				StringWriter sw = new StringWriter();
179
				transformer.transform(new DOMSource(n), new StreamResult(sw));
180
				recordQueue.add(sw.toString());
181
			}
182
				
147 183
			resumptionInt += resultSizeValue;
148 184
			if(resumptionType=="scan"){ resumptionStr = xprResumptionPath.evaluate(resultNode);}
149 185
			if(resumptionType=="count"){ resumptionStr = Integer.toString(resumptionInt); }
......
155 191
			System.out.println("resultTotal: " + resultTotal);
156 192
			System.out.println("resInt: " + resumptionInt);
157 193
			if (resumptionInt < resultTotal) {
158
				updateQueue();
159
			}
160
			return resultXml;
194
				nextQuery = baseUrl + "?" + queryParams + querySize + "&" + resumptionParam + "=" + resumptionStr + queryFormat;
195
			}else
196
				nextQuery = "";
197
			return nextQuery;
161 198

  
162 199
		}catch(Exception exc) {
163 200
			exc.printStackTrace(System.err);
164 201
			throw new IllegalStateException("collection failed: " + exc.getMessage());
165 202
		}
203

  
166 204
	}
167 205

  
168 206
}

Also available in: Unified diff