Revision 50981
Added by Claudio Atzori about 6 years ago
DatabaseUtils.java | ||
---|---|---|
151 | 151 |
if (clazz == BlockingQueue.class) { |
152 | 152 |
log.debug("Creating Queue"); |
153 | 153 |
|
154 |
final ArrayBlockingQueue<Map<String, Object>> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
|
|
154 |
final ArrayBlockingQueue<Document> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
|
|
155 | 155 |
Executors.newSingleThreadExecutor().submit(() -> { |
156 | 156 |
final DataSource ds = dataSourceFactory.createDataSource(dbName); |
157 | 157 |
try (final Connection con = getConnection(ds); |
... | ... | |
179 | 179 |
} |
180 | 180 |
// An empty Map indicates the end of the resultset |
181 | 181 |
enqueue(queue, new HashMap<>()); |
182 |
} catch (SQLException e) { |
|
182 |
} catch (SQLException | DatabaseException e) {
|
|
183 | 183 |
throw new RuntimeException(e); |
184 | 184 |
} |
185 | 185 |
}); |
... | ... | |
200 | 200 |
} |
201 | 201 |
} |
202 | 202 |
|
203 |
private boolean enqueue(final ArrayBlockingQueue<Map<String, Object>> q, final Map<String, Object> row) {
|
|
203 |
private boolean enqueue(final ArrayBlockingQueue<Document> q, final Map<String, Object> row) throws DatabaseException {
|
|
204 | 204 |
try { |
205 |
return q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
|
205 |
return q.offer(rowToDocument(row), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
|
|
206 | 206 |
} catch (InterruptedException e) { |
207 | 207 |
log.error("Error putting element in queue"); |
208 | 208 |
throw new RuntimeException(e); |
... | ... | |
286 | 286 |
} |
287 | 287 |
|
288 | 288 |
public Document rowToDocument(final Map<?, ?> map) throws DatabaseException { |
289 |
Document doc = DocumentHelper.createDocument(); |
|
289 |
final Document doc = DocumentHelper.createDocument(); |
|
290 |
final Element row = doc.addElement("ROW"); |
|
290 | 291 |
|
291 |
Element row = doc.addElement("ROW"); |
|
292 | 292 |
for (Map.Entry<?, ?> entry : map.entrySet()) { |
293 | 293 |
Element col = row.addElement("FIELD"); |
294 | 294 |
col.addAttribute("name", "" + entry.getKey()); |
... | ... | |
329 | 329 |
private void addValue(final Element elem, final Object value) throws DatabaseException { |
330 | 330 |
if (value instanceof Array) { |
331 | 331 |
try { |
332 |
for (Object o : (Object[]) ((Array) value).getArray()) { |
|
332 |
final Array arrayValue = (Array) value; |
|
333 |
for (Object o : (Object[]) arrayValue.getArray()) { |
|
333 | 334 |
addValue(elem.addElement("ITEM"), o); |
334 | 335 |
} |
335 | 336 |
} catch (Exception e) { |
337 |
log.error(e); |
|
336 | 338 |
throw new DatabaseException("Error procsessing a Array", e); |
337 | 339 |
} |
338 | 340 |
} else if (value != null) { |
Also available in: Unified diff
fixing connection closed errors with updated postgresql driver