Revision 50981
Added by Claudio Atzori about 6 years ago
IterableRowSet.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.enabling.database.resultset; |
2 | 2 |
|
3 | 3 |
import java.util.Iterator; |
4 |
import java.util.Map; |
|
5 | 4 |
import java.util.NoSuchElementException; |
6 | 5 |
import java.util.concurrent.BlockingQueue; |
7 | 6 |
import java.util.concurrent.TimeUnit; |
8 | 7 |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 | 8 |
import eu.dnetlib.enabling.database.rmi.DatabaseException; |
13 | 9 |
import eu.dnetlib.enabling.database.utils.DatabaseUtils; |
14 | 10 |
import eu.dnetlib.enabling.resultset.SizedIterable; |
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 |
import org.dom4j.Document; |
|
15 | 14 |
|
16 | 15 |
public class IterableRowSet implements SizedIterable<String> { |
17 | 16 |
|
... | ... | |
35 | 34 |
@SuppressWarnings("unchecked") |
36 | 35 |
public Iterator<String> iterator() { |
37 | 36 |
try { |
38 |
final BlockingQueue<Map<String, Object>> queue = dbUtils.executeSql(db, sql, BlockingQueue.class);
|
|
37 |
final BlockingQueue<Document> queue = dbUtils.executeSql(db, sql, BlockingQueue.class);
|
|
39 | 38 |
return new QueueIterator(queue); |
40 | 39 |
} catch (Exception e) { |
41 | 40 |
throw new RuntimeException("Error creating iterator for query: " + sql, e); |
... | ... | |
61 | 60 |
|
62 | 61 |
private class QueueIterator implements Iterator<String> { |
63 | 62 |
|
64 |
private Map<String, Object> curr;
|
|
65 |
private BlockingQueue<Map<String, Object>> queue;
|
|
63 |
private Document curr;
|
|
64 |
private BlockingQueue<Document> queue;
|
|
66 | 65 |
|
67 |
public QueueIterator(final BlockingQueue<Map<String, Object>> queue) throws InterruptedException {
|
|
66 |
public QueueIterator(final BlockingQueue<Document> queue) throws InterruptedException {
|
|
68 | 67 |
super(); |
69 | 68 |
this.queue = queue; |
70 | 69 |
this.curr = queue.poll(DatabaseUtils.BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
... | ... | |
75 | 74 |
log.debug("Reading Next Element from queue"); |
76 | 75 |
|
77 | 76 |
try { |
78 |
if ((curr == null) || curr.isEmpty()) { throw new NoSuchElementException(
|
|
77 |
if ((curr == null) || !hasContent(curr)) { throw new NoSuchElementException(
|
|
79 | 78 |
"Value from queue is null or empty, probably the producer doesn't produce"); } |
80 |
String res = dbUtils.rowToDocument(curr).asXML();
|
|
79 |
String res = curr.asXML();
|
|
81 | 80 |
curr = queue.poll(DatabaseUtils.BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
82 | 81 |
return res; |
83 | 82 |
} catch (Exception e) { |
... | ... | |
85 | 84 |
} |
86 | 85 |
} |
87 | 86 |
|
87 |
private boolean hasContent(final Document curr) { |
|
88 |
return curr != null && Integer.parseInt(curr.valueOf("count(/ROW/*)")) > 0; |
|
89 |
} |
|
90 |
|
|
88 | 91 |
@Override |
89 | 92 |
public boolean hasNext() { |
90 | 93 |
// An empty Map indicates the end of the Queue |
91 |
return (curr != null) && !curr.isEmpty();
|
|
94 |
return (curr != null) && hasContent(curr);
|
|
92 | 95 |
} |
93 | 96 |
|
94 | 97 |
@Override |
Also available in: Unified diff
fixing connection closed errors with updated postgresql driver