Revision 50981
Added by Claudio Atzori about 6 years ago
modules/cnr-enabling-database-service/trunk/src/main/java/eu/dnetlib/enabling/database/utils/DatabaseUtils.java | ||
---|---|---|
151 | 151 |
if (clazz == BlockingQueue.class) { |
152 | 152 |
log.debug("Creating Queue"); |
153 | 153 |
|
154 |
final ArrayBlockingQueue<Map<String, Object>> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
|
|
154 |
final ArrayBlockingQueue<Document> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
|
|
155 | 155 |
Executors.newSingleThreadExecutor().submit(() -> { |
156 | 156 |
final DataSource ds = dataSourceFactory.createDataSource(dbName); |
157 | 157 |
try (final Connection con = getConnection(ds); |
... | ... | |
179 | 179 |
} |
180 | 180 |
// An empty Map indicates the end of the resultset |
181 | 181 |
enqueue(queue, new HashMap<>()); |
182 |
} catch (SQLException e) { |
|
182 |
} catch (SQLException | DatabaseException e) {
|
|
183 | 183 |
throw new RuntimeException(e); |
184 | 184 |
} |
185 | 185 |
}); |
... | ... | |
200 | 200 |
} |
201 | 201 |
} |
202 | 202 |
|
203 |
private boolean enqueue(final ArrayBlockingQueue<Map<String, Object>> q, final Map<String, Object> row) {
|
|
203 |
private boolean enqueue(final ArrayBlockingQueue<Document> q, final Map<String, Object> row) throws DatabaseException {
|
|
204 | 204 |
try { |
205 |
return q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
|
205 |
return q.offer(rowToDocument(row), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
|
|
206 | 206 |
} catch (InterruptedException e) { |
207 | 207 |
log.error("Error putting element in queue"); |
208 | 208 |
throw new RuntimeException(e); |
... | ... | |
286 | 286 |
} |
287 | 287 |
|
288 | 288 |
public Document rowToDocument(final Map<?, ?> map) throws DatabaseException { |
289 |
Document doc = DocumentHelper.createDocument(); |
|
289 |
final Document doc = DocumentHelper.createDocument(); |
|
290 |
final Element row = doc.addElement("ROW"); |
|
290 | 291 |
|
291 |
Element row = doc.addElement("ROW"); |
|
292 | 292 |
for (Map.Entry<?, ?> entry : map.entrySet()) { |
293 | 293 |
Element col = row.addElement("FIELD"); |
294 | 294 |
col.addAttribute("name", "" + entry.getKey()); |
... | ... | |
329 | 329 |
private void addValue(final Element elem, final Object value) throws DatabaseException { |
330 | 330 |
if (value instanceof Array) { |
331 | 331 |
try { |
332 |
for (Object o : (Object[]) ((Array) value).getArray()) { |
|
332 |
final Array arrayValue = (Array) value; |
|
333 |
for (Object o : (Object[]) arrayValue.getArray()) { |
|
333 | 334 |
addValue(elem.addElement("ITEM"), o); |
334 | 335 |
} |
335 | 336 |
} catch (Exception e) { |
337 |
log.error(e); |
|
336 | 338 |
throw new DatabaseException("Error procsessing a Array", e); |
337 | 339 |
} |
338 | 340 |
} else if (value != null) { |
modules/cnr-enabling-database-service/trunk/src/main/java/eu/dnetlib/enabling/database/resultset/IterableRowSet.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.enabling.database.resultset; |
2 | 2 |
|
3 | 3 |
import java.util.Iterator; |
4 |
import java.util.Map; |
|
5 | 4 |
import java.util.NoSuchElementException; |
6 | 5 |
import java.util.concurrent.BlockingQueue; |
7 | 6 |
import java.util.concurrent.TimeUnit; |
8 | 7 |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 | 8 |
import eu.dnetlib.enabling.database.rmi.DatabaseException; |
13 | 9 |
import eu.dnetlib.enabling.database.utils.DatabaseUtils; |
14 | 10 |
import eu.dnetlib.enabling.resultset.SizedIterable; |
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 |
import org.dom4j.Document; |
|
15 | 14 |
|
16 | 15 |
public class IterableRowSet implements SizedIterable<String> { |
17 | 16 |
|
... | ... | |
35 | 34 |
@SuppressWarnings("unchecked") |
36 | 35 |
public Iterator<String> iterator() { |
37 | 36 |
try { |
38 |
final BlockingQueue<Map<String, Object>> queue = dbUtils.executeSql(db, sql, BlockingQueue.class);
|
|
37 |
final BlockingQueue<Document> queue = dbUtils.executeSql(db, sql, BlockingQueue.class);
|
|
39 | 38 |
return new QueueIterator(queue); |
40 | 39 |
} catch (Exception e) { |
41 | 40 |
throw new RuntimeException("Error creating iterator for query: " + sql, e); |
... | ... | |
61 | 60 |
|
62 | 61 |
private class QueueIterator implements Iterator<String> { |
63 | 62 |
|
64 |
private Map<String, Object> curr;
|
|
65 |
private BlockingQueue<Map<String, Object>> queue;
|
|
63 |
private Document curr;
|
|
64 |
private BlockingQueue<Document> queue;
|
|
66 | 65 |
|
67 |
public QueueIterator(final BlockingQueue<Map<String, Object>> queue) throws InterruptedException {
|
|
66 |
public QueueIterator(final BlockingQueue<Document> queue) throws InterruptedException {
|
|
68 | 67 |
super(); |
69 | 68 |
this.queue = queue; |
70 | 69 |
this.curr = queue.poll(DatabaseUtils.BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
... | ... | |
75 | 74 |
log.debug("Reading Next Element from queue"); |
76 | 75 |
|
77 | 76 |
try { |
78 |
if ((curr == null) || curr.isEmpty()) { throw new NoSuchElementException(
|
|
77 |
if ((curr == null) || !hasContent(curr)) { throw new NoSuchElementException(
|
|
79 | 78 |
"Value from queue is null or empty, probably the producer doesn't produce"); } |
80 |
String res = dbUtils.rowToDocument(curr).asXML();
|
|
79 |
String res = curr.asXML();
|
|
81 | 80 |
curr = queue.poll(DatabaseUtils.BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
82 | 81 |
return res; |
83 | 82 |
} catch (Exception e) { |
... | ... | |
85 | 84 |
} |
86 | 85 |
} |
87 | 86 |
|
87 |
private boolean hasContent(final Document curr) { |
|
88 |
return curr != null && Integer.parseInt(curr.valueOf("count(/ROW/*)")) > 0; |
|
89 |
} |
|
90 |
|
|
88 | 91 |
@Override |
89 | 92 |
public boolean hasNext() { |
90 | 93 |
// An empty Map indicates the end of the Queue |
91 |
return (curr != null) && !curr.isEmpty();
|
|
94 |
return (curr != null) && hasContent(curr);
|
|
92 | 95 |
} |
93 | 96 |
|
94 | 97 |
@Override |
Also available in: Unified diff
fixing connection closed errors with updated postgresql driver