Revision 50753
Added by Claudio Atzori about 6 years ago
DatabaseUtils.java | ||
---|---|---|
23 | 23 |
import eu.dnetlib.enabling.database.rmi.DatabaseException; |
24 | 24 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
25 | 25 |
import eu.dnetlib.miscutils.functional.string.Sanitizer; |
26 |
import org.apache.commons.dbcp2.BasicDataSource; |
|
27 | 26 |
import org.apache.commons.lang.BooleanUtils; |
28 | 27 |
import org.apache.commons.logging.Log; |
29 | 28 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
154 | 153 |
|
155 | 154 |
final ArrayBlockingQueue<Map<String, Object>> queue = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE); |
156 | 155 |
Executors.newSingleThreadExecutor().submit(() -> { |
157 |
|
|
158 |
try (final Connection con = getJdbcTemplate(dbName).getDataSource().getConnection();
|
|
156 |
final DataSource ds = dataSourceFactory.createDataSource(dbName); |
|
157 |
try (final Connection con = getConnection(ds);
|
|
159 | 158 |
final PreparedStatement stm = getStm(query, con); |
160 | 159 |
final ResultSet rs = stm.executeQuery()) { |
161 |
con.setAutoCommit(false); |
|
160 |
|
|
162 | 161 |
rs.setFetchSize(getFetchSize()); |
163 | 162 |
boolean timeout = false; |
164 |
log.info(String.format("starting to populate queue T-id %s", Thread.currentThread().getId()));
|
|
163 |
log.info(String.format("[Thread Id %s] starting to populate queue", Thread.currentThread().getId()));
|
|
165 | 164 |
while (rs.next()) { |
166 | 165 |
ResultSetMetaData md = rs.getMetaData(); |
167 | 166 |
Map<String, Object> row = new HashMap<>(); |
... | ... | |
174 | 173 |
} |
175 | 174 |
} |
176 | 175 |
if (timeout) { |
177 |
throw new RuntimeException(String.format("queue full, consumer did not consume for %s seconds", BLOCKING_QUEUE_TIMEOUT)); |
|
176 |
log.warn(String.format("[Thread Id %s] queue full, consumer did not consume for %s seconds, I give up", |
|
177 |
Thread.currentThread().getId(), BLOCKING_QUEUE_TIMEOUT)); |
|
178 |
return; |
|
178 | 179 |
} |
179 | 180 |
// An empty Map indicates the end of the resultset |
180 | 181 |
enqueue(queue, new HashMap<>()); |
... | ... | |
188 | 189 |
return (T) queue; |
189 | 190 |
} |
190 | 191 |
|
191 |
try(BasicDataSource ds = (BasicDataSource) dataSourceFactory.createDataSource(dbName)) { |
|
192 |
final JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(ds); |
|
193 |
if (clazz == Integer.class) return (T) jdbcTemplate.queryForObject(query, Integer.class); |
|
194 |
else if (clazz == List.class) return (T) jdbcTemplate.queryForList(query); |
|
195 |
else if (clazz == Map.class) return (T) jdbcTemplate.queryForMap(query); |
|
196 |
else if (clazz == SqlRowSet.class) return (T) jdbcTemplate.queryForRowSet(query); |
|
197 |
else { |
|
198 |
jdbcTemplate.update(query); |
|
199 |
return null; |
|
200 |
} |
|
201 |
} catch (SQLException e) { |
|
202 |
throw new DatabaseException(e); |
|
192 |
final JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(dbName); |
|
193 |
if (clazz == Integer.class) return (T) jdbcTemplate.queryForObject(query, Integer.class); |
|
194 |
else if (clazz == List.class) return (T) jdbcTemplate.queryForList(query); |
|
195 |
else if (clazz == Map.class) return (T) jdbcTemplate.queryForMap(query); |
|
196 |
else if (clazz == SqlRowSet.class) return (T) jdbcTemplate.queryForRowSet(query); |
|
197 |
else { |
|
198 |
jdbcTemplate.update(query); |
|
199 |
return null; |
|
203 | 200 |
} |
204 | 201 |
} |
205 | 202 |
|
... | ... | |
218 | 215 |
return stm; |
219 | 216 |
} |
220 | 217 |
|
218 |
private Connection getConnection(final DataSource dataSource) throws SQLException { |
|
219 |
final Connection conn = dataSource.getConnection(); |
|
220 |
conn.setAutoCommit(false); |
|
221 |
return conn; |
|
222 |
} |
|
223 |
|
|
221 | 224 |
public boolean contains(final String db, final String table, final String column, final String value) throws DatabaseException { |
222 | 225 |
String query = ""; |
223 | 226 |
try { |
Also available in: Unified diff
experimenting with BasicDatasource(s) from commons-dbcp2