Project

General

Profile

« Previous | Next » 

Revision 50981

fixing connection closed errors with updated postgresql driver

View differences:

modules/cnr-enabling-database-service/trunk/src/main/java/eu/dnetlib/enabling/database/utils/DatabaseUtils.java
151 151
		if (clazz == BlockingQueue.class) {
152 152
			log.debug("Creating Queue");
153 153

  
154
			final ArrayBlockingQueue<Map<String, Object>> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
154
			final ArrayBlockingQueue<Document> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
155 155
			Executors.newSingleThreadExecutor().submit(() -> {
156 156
				final DataSource ds = dataSourceFactory.createDataSource(dbName);
157 157
				try (final Connection con = getConnection(ds);
......
179 179
					}
180 180
					// An empty Map indicates the end of the resultset
181 181
					enqueue(queue, new HashMap<>());
182
				} catch (SQLException e) {
182
				} catch (SQLException | DatabaseException e) {
183 183
					throw new RuntimeException(e);
184 184
				}
185 185
			});
......
200 200
		}
201 201
	}
202 202

  
203
	private boolean enqueue(final ArrayBlockingQueue<Map<String, Object>> q, final Map<String, Object> row) {
203
	private boolean enqueue(final ArrayBlockingQueue<Document> q, final Map<String, Object> row) throws DatabaseException {
204 204
		try {
205
			return q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
205
			return q.offer(rowToDocument(row), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
206 206
		} catch (InterruptedException e) {
207 207
			log.error("Error putting element in queue");
208 208
			throw new RuntimeException(e);
......
286 286
	}
287 287

  
288 288
	public Document rowToDocument(final Map<?, ?> map) throws DatabaseException {
289
		Document doc = DocumentHelper.createDocument();
289
		final Document doc = DocumentHelper.createDocument();
290
		final Element row = doc.addElement("ROW");
290 291

  
291
		Element row = doc.addElement("ROW");
292 292
		for (Map.Entry<?, ?> entry : map.entrySet()) {
293 293
			Element col = row.addElement("FIELD");
294 294
			col.addAttribute("name", "" + entry.getKey());
......
329 329
	private void addValue(final Element elem, final Object value) throws DatabaseException {
330 330
		if (value instanceof Array) {
331 331
			try {
332
				for (Object o : (Object[]) ((Array) value).getArray()) {
332
				final Array arrayValue = (Array) value;
333
				for (Object o : (Object[]) arrayValue.getArray()) {
333 334
					addValue(elem.addElement("ITEM"), o);
334 335
				}
335 336
			} catch (Exception e) {
337
				log.error(e);
336 338
				throw new DatabaseException("Error procsessing a Array", e);
337 339
			}
338 340
		} else if (value != null) {
modules/cnr-enabling-database-service/trunk/src/main/java/eu/dnetlib/enabling/database/resultset/IterableRowSet.java
1 1
package eu.dnetlib.enabling.database.resultset;
2 2

  
3 3
import java.util.Iterator;
4
import java.util.Map;
5 4
import java.util.NoSuchElementException;
6 5
import java.util.concurrent.BlockingQueue;
7 6
import java.util.concurrent.TimeUnit;
8 7

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

  
12 8
import eu.dnetlib.enabling.database.rmi.DatabaseException;
13 9
import eu.dnetlib.enabling.database.utils.DatabaseUtils;
14 10
import eu.dnetlib.enabling.resultset.SizedIterable;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.dom4j.Document;
15 14

  
16 15
public class IterableRowSet implements SizedIterable<String> {
17 16

  
......
35 34
	@SuppressWarnings("unchecked")
36 35
	public Iterator<String> iterator() {
37 36
		try {
38
			final BlockingQueue<Map<String, Object>> queue = dbUtils.executeSql(db, sql, BlockingQueue.class);
37
			final BlockingQueue<Document> queue = dbUtils.executeSql(db, sql, BlockingQueue.class);
39 38
			return new QueueIterator(queue);
40 39
		} catch (Exception e) {
41 40
			throw new RuntimeException("Error creating iterator for query: " + sql, e);
......
61 60

  
62 61
	private class QueueIterator implements Iterator<String> {
63 62

  
64
		private Map<String, Object> curr;
65
		private BlockingQueue<Map<String, Object>> queue;
63
		private Document curr;
64
		private BlockingQueue<Document> queue;
66 65

  
67
		public QueueIterator(final BlockingQueue<Map<String, Object>> queue) throws InterruptedException {
66
		public QueueIterator(final BlockingQueue<Document> queue) throws InterruptedException {
68 67
			super();
69 68
			this.queue = queue;
70 69
			this.curr = queue.poll(DatabaseUtils.BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
......
75 74
			log.debug("Reading Next Element from queue");
76 75

  
77 76
			try {
78
				if ((curr == null) || curr.isEmpty()) { throw new NoSuchElementException(
77
				if ((curr == null) || !hasContent(curr)) { throw new NoSuchElementException(
79 78
						"Value from queue is null or empty, probably the producer doesn't produce"); }
80
				String res = dbUtils.rowToDocument(curr).asXML();
79
				String res = curr.asXML();
81 80
				curr = queue.poll(DatabaseUtils.BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
82 81
				return res;
83 82
			} catch (Exception e) {
......
85 84
			}
86 85
		}
87 86

  
87
		private boolean hasContent(final Document curr) {
88
			return curr != null && Integer.parseInt(curr.valueOf("count(/ROW/*)")) > 0;
89
		}
90

  
88 91
		@Override
89 92
		public boolean hasNext() {
90 93
			// An empty Map indicates the end of the Queue
91
			return (curr != null) && !curr.isEmpty();
94
			return (curr != null) && hasContent(curr);
92 95
		}
93 96

  
94 97
		@Override

Also available in: Unified diff