1 |
1 |
package eu.dnetlib.enabling.database.utils;
|
2 |
2 |
|
3 |
3 |
import java.io.StringReader;
|
4 |
|
import java.sql.Array;
|
|
4 |
import java.sql.*;
|
5 |
5 |
import java.sql.Date;
|
6 |
|
import java.sql.ResultSetMetaData;
|
7 |
6 |
import java.text.ParseException;
|
8 |
7 |
import java.text.SimpleDateFormat;
|
9 |
8 |
import java.util.*;
|
... | ... | |
38 |
37 |
import org.springframework.beans.factory.annotation.Required;
|
39 |
38 |
import org.springframework.dao.DataAccessException;
|
40 |
39 |
import org.springframework.jdbc.core.JdbcTemplate;
|
41 |
|
import org.springframework.jdbc.core.RowCallbackHandler;
|
42 |
40 |
import org.springframework.jdbc.support.rowset.SqlRowSet;
|
43 |
41 |
import org.springframework.transaction.support.TransactionTemplate;
|
44 |
42 |
import org.springframework.ui.velocity.VelocityEngineUtils;
|
... | ... | |
151 |
149 |
public <T> T executeSql(final String dbName, final String query, final Class<T> clazz) throws DatabaseException {
|
152 |
150 |
|
153 |
151 |
final JdbcTemplate jdbcTemplate = getJdbcTemplate(dbName);
|
154 |
|
|
155 |
152 |
try {
|
156 |
153 |
if (clazz == Integer.class) return (T) jdbcTemplate.queryForObject(query, Integer.class);
|
157 |
154 |
else if (clazz == List.class) return (T) jdbcTemplate.queryForList(query);
|
... | ... | |
161 |
158 |
log.debug("Creating Queue");
|
162 |
159 |
|
163 |
160 |
final ArrayBlockingQueue<Map<String, Object>> q = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
|
164 |
|
|
165 |
|
Runnable run = new Runnable() {
|
166 |
|
|
167 |
|
@Override
|
168 |
|
public void run() {
|
169 |
|
try {
|
170 |
|
jdbcTemplate.query(query, getRowCallback(q));
|
171 |
|
} catch (Throwable e) {
|
172 |
|
log.error("Exception executing SQL", e);
|
173 |
|
throw new RuntimeException(e);
|
174 |
|
}
|
175 |
|
try {
|
176 |
|
// An empty Map indicates the end of the resultset
|
177 |
|
q.offer(new HashMap<>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
|
178 |
|
} catch (InterruptedException e) {
|
179 |
|
log.error("Error putting LAST element in queue");
|
180 |
|
throw new RuntimeException(e);
|
181 |
|
}
|
182 |
|
log.debug(" -- End of Sql Resultset");
|
183 |
|
}
|
184 |
|
|
185 |
|
private RowCallbackHandler getRowCallback(final BlockingQueue<Map<String, Object>> q) {
|
186 |
|
return rs -> {
|
187 |
|
|
188 |
|
ResultSetMetaData md = rs.getMetaData();
|
189 |
|
Map<String, Object> row = new HashMap<>();
|
190 |
|
for (int i = 1; i <= md.getColumnCount(); i++) {
|
191 |
|
row.put(md.getColumnName(i), rs.getObject(i));
|
192 |
|
}
|
193 |
|
try {
|
194 |
|
if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
|
195 |
|
log.warn("The consumer doesn't consume my queue, I stop");
|
196 |
|
rs.close();
|
197 |
|
return;
|
|
161 |
Executors.newSingleThreadExecutor().submit(() -> {
|
|
162 |
try {
|
|
163 |
final PreparedStatement stm = getStm(query, getConnection(jdbcTemplate));
|
|
164 |
try (final ResultSet rs = stm.executeQuery()) {
|
|
165 |
rs.setFetchSize(getFetchSize());
|
|
166 |
while (rs.next()) {
|
|
167 |
ResultSetMetaData md = rs.getMetaData();
|
|
168 |
Map<String, Object> row = new HashMap<>();
|
|
169 |
for (int i = 1; i <= md.getColumnCount(); i++) {
|
|
170 |
row.put(md.getColumnName(i), rs.getObject(i));
|
198 |
171 |
}
|
199 |
|
log.debug("Putted element in queue");
|
200 |
|
} catch (InterruptedException e) {
|
201 |
|
log.error("Error putting element in queue");
|
202 |
|
throw new RuntimeException(e);
|
|
172 |
if (!enqueue(q, row)) {
|
|
173 |
break;
|
|
174 |
}
|
203 |
175 |
}
|
204 |
|
};
|
|
176 |
} finally {
|
|
177 |
getConnection(jdbcTemplate).setAutoCommit(true);
|
|
178 |
}
|
|
179 |
// An empty Map indicates the end of the resultset
|
|
180 |
enqueue(q, new HashMap<>());
|
|
181 |
} catch (SQLException e) {
|
|
182 |
throw new RuntimeException(e);
|
205 |
183 |
}
|
206 |
|
};
|
207 |
|
Executors.newSingleThreadExecutor().submit(run);
|
|
184 |
});
|
208 |
185 |
|
209 |
186 |
log.debug("Returned Queue");
|
210 |
187 |
|
... | ... | |
218 |
195 |
}
|
219 |
196 |
}
|
220 |
197 |
|
|
198 |
private boolean enqueue(final ArrayBlockingQueue<Map<String, Object>> q, final Map<String, Object> row) {
|
|
199 |
try {
|
|
200 |
return q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
|
|
201 |
} catch (InterruptedException e) {
|
|
202 |
log.error("Error putting element in queue");
|
|
203 |
throw new RuntimeException(e);
|
|
204 |
}
|
|
205 |
}
|
|
206 |
|
|
207 |
private PreparedStatement getStm(final String query, final Connection con) throws SQLException {
|
|
208 |
final PreparedStatement stm = con.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY);
|
|
209 |
stm.setFetchSize(getFetchSize());
|
|
210 |
return stm;
|
|
211 |
}
|
|
212 |
|
|
213 |
private Connection getConnection(final JdbcTemplate jdbcTemplate) throws SQLException {
|
|
214 |
final Connection conn = jdbcTemplate.getDataSource().getConnection();
|
|
215 |
conn.setAutoCommit(false);
|
|
216 |
return conn;
|
|
217 |
}
|
|
218 |
|
221 |
219 |
public boolean contains(final String db, final String table, final String column, final String value) throws DatabaseException {
|
222 |
220 |
String query = "";
|
223 |
221 |
try {
|
... | ... | |
761 |
759 |
|
762 |
760 |
public class TableDates {
|
763 |
761 |
|
764 |
|
private Date lastInsert;
|
765 |
|
private Date lastUpdate;
|
766 |
|
private Date lastDelete;
|
|
762 |
private java.sql.Date lastInsert;
|
|
763 |
private java.sql.Date lastUpdate;
|
|
764 |
private java.sql.Date lastDelete;
|
767 |
765 |
|
768 |
|
public Date getLastInsert() {
|
|
766 |
public java.sql.Date getLastInsert() {
|
769 |
767 |
return lastInsert;
|
770 |
768 |
}
|
771 |
769 |
|
avoid to use jdbcTemplate as it tries to load the entire resultset in memory