Revision 46995
Added by Claudio Atzori almost 7 years ago
DatabaseUtils.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.enabling.database.utils; |
2 | 2 |
|
3 | 3 |
import java.io.StringReader; |
4 |
import java.sql.Array;
|
|
4 |
import java.sql.*;
|
|
5 | 5 |
import java.sql.Date; |
6 |
import java.sql.ResultSetMetaData; |
|
7 | 6 |
import java.text.ParseException; |
8 | 7 |
import java.text.SimpleDateFormat; |
9 | 8 |
import java.util.*; |
... | ... | |
25 | 24 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
26 | 25 |
import eu.dnetlib.miscutils.functional.string.Sanitizer; |
27 | 26 |
import org.apache.commons.lang.BooleanUtils; |
28 |
import org.apache.commons.lang.StringUtils; |
|
29 | 27 |
import org.apache.commons.logging.Log; |
30 | 28 |
import org.apache.commons.logging.LogFactory; |
31 | 29 |
import org.apache.velocity.app.VelocityEngine; |
... | ... | |
39 | 37 |
import org.springframework.beans.factory.annotation.Required; |
40 | 38 |
import org.springframework.dao.DataAccessException; |
41 | 39 |
import org.springframework.jdbc.core.JdbcTemplate; |
40 |
import org.springframework.jdbc.core.RowCallbackHandler; |
|
42 | 41 |
import org.springframework.jdbc.support.rowset.SqlRowSet; |
42 |
import org.springframework.transaction.TransactionStatus; |
|
43 |
import org.springframework.transaction.support.TransactionCallback; |
|
43 | 44 |
import org.springframework.transaction.support.TransactionTemplate; |
44 | 45 |
import org.springframework.ui.velocity.VelocityEngineUtils; |
45 | 46 |
|
... | ... | |
74 | 75 |
return getTypedListFromSql(database, query, String.class); |
75 | 76 |
} |
76 | 77 |
|
78 |
public Map<String, TableDates> getTableDatesForDB(final String db) throws DatabaseException { |
|
79 |
Map<String, TableDates> res = new HashMap<String, TableDates>(); |
|
80 |
|
|
81 |
for (String table : listCommonDBTables(db)) { |
|
82 |
try { |
|
83 |
TableDates dates = new TableDates(); |
|
84 |
|
|
85 |
String query = |
|
86 |
"select lastinsert, lastupdate, lastdelete from " + "(select max(date) as lastinsert from " + table |
|
87 |
+ "_log where operation='insert') as t1, " + "(select max(date) as lastupdate from " + table |
|
88 |
+ "_log where operation='update') as t2, " + "(select max(date) as lastdelete from " + table |
|
89 |
+ "_log where operation='delete') as t3"; |
|
90 |
|
|
91 |
SqlRowSet srs = executeSql(db, query, SqlRowSet.class); |
|
92 |
if (srs.next()) { |
|
93 |
dates.setLastInsert(srs.getDate("lastinsert")); |
|
94 |
dates.setLastUpdate(srs.getDate("lastupdate")); |
|
95 |
dates.setLastDelete(srs.getDate("lastdelete")); |
|
96 |
} |
|
97 |
res.put(table, dates); |
|
98 |
} catch (Exception e) { |
|
99 |
log.warn("Error obtaing dates for table " + table, e); |
|
100 |
} |
|
101 |
} |
|
102 |
return res; |
|
103 |
} |
|
104 |
|
|
77 | 105 |
public List<DnetDatabase> listAllDatabases() throws DatabaseException { |
78 | 106 |
final String query = |
79 | 107 |
"SELECT d.datname AS db, COALESCE(dsc.description,'')='isManaged' AS managed FROM pg_database d LEFT OUTER JOIN pg_shdescription dsc ON (d.oid = dsc.objoid) WHERE d.datname LIKE '" |
... | ... | |
107 | 135 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(dbName); |
108 | 136 |
|
109 | 137 |
try { |
110 |
List<String> list = new ArrayList<>(); |
|
138 |
List<String> list = new ArrayList<String>();
|
|
111 | 139 |
for (Object obj : jdbcTemplate.queryForList(query)) { |
112 | 140 |
list.add(obj.toString()); |
113 | 141 |
} |
... | ... | |
135 | 163 |
|
136 | 164 |
final ArrayBlockingQueue<Map<String, Object>> q = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE); |
137 | 165 |
|
138 |
Executors.newSingleThreadExecutor().submit(() -> { |
|
139 |
try { |
|
140 |
jdbcTemplate.query(query, rs -> { |
|
166 |
Runnable run = new Runnable() { |
|
141 | 167 |
|
142 |
ResultSetMetaData md = rs.getMetaData(); |
|
143 |
Map<String, Object> row = new HashMap<>(); |
|
144 |
for (int i = 1; i <= md.getColumnCount(); i++) { |
|
145 |
row.put(md.getColumnName(i), rs.getObject(i)); |
|
146 |
} |
|
147 |
try { |
|
148 |
if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) { |
|
149 |
log.warn("The consumer doesn't consume my queue, I stop"); |
|
150 |
rs.close(); |
|
151 |
return; |
|
168 |
@Override |
|
169 |
public void run() { |
|
170 |
try { |
|
171 |
jdbcTemplate.query(query, getRowCallback(q)); |
|
172 |
} catch (Throwable e) { |
|
173 |
log.error("Exception executing SQL", e); |
|
174 |
throw new RuntimeException(e); |
|
175 |
} |
|
176 |
try { |
|
177 |
// An empty Map indicates the end of the resultset |
|
178 |
q.offer(new HashMap<String, Object>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
|
179 |
} catch (InterruptedException e) { |
|
180 |
log.error("Error putting LAST element in queue"); |
|
181 |
throw new RuntimeException(e); |
|
182 |
} |
|
183 |
log.debug(" -- End of Sql Resultset"); |
|
184 |
} |
|
185 |
|
|
186 |
private RowCallbackHandler getRowCallback(final BlockingQueue<Map<String, Object>> q) { |
|
187 |
return new RowCallbackHandler() { |
|
188 |
|
|
189 |
@Override |
|
190 |
public void processRow(final ResultSet rs) throws SQLException { |
|
191 |
|
|
192 |
ResultSetMetaData md = rs.getMetaData(); |
|
193 |
Map<String, Object> row = new HashMap<String, Object>(); |
|
194 |
for (int i = 1; i <= md.getColumnCount(); i++) { |
|
195 |
row.put(md.getColumnName(i), rs.getObject(i)); |
|
152 | 196 |
} |
153 |
log.debug("Putted element in queue"); |
|
154 |
} catch (InterruptedException e) { |
|
155 |
log.error("Error putting element in queue"); |
|
156 |
throw new RuntimeException(e); |
|
197 |
try { |
|
198 |
if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) { |
|
199 |
log.warn("The consumer doesn't consume my queue, I stop"); |
|
200 |
rs.close(); |
|
201 |
return; |
|
202 |
} |
|
203 |
log.debug("Putted element in queue"); |
|
204 |
} catch (InterruptedException e) { |
|
205 |
log.error("Error putting element in queue"); |
|
206 |
throw new RuntimeException(e); |
|
207 |
} |
|
157 | 208 |
} |
158 |
}); |
|
159 |
} catch (Throwable e) { |
|
160 |
log.error("Exception executing SQL", e); |
|
161 |
throw new RuntimeException(e); |
|
209 |
}; |
|
162 | 210 |
} |
163 |
try { |
|
164 |
// An empty Map indicates the end of the resultset |
|
165 |
q.offer(new HashMap<>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS); |
|
166 |
} catch (InterruptedException e) { |
|
167 |
log.error("Error putting LAST element in queue"); |
|
168 |
throw new RuntimeException(e); |
|
169 |
} |
|
170 |
log.debug(" -- End of Sql Resultset"); |
|
171 |
}); |
|
211 |
}; |
|
212 |
Executors.newSingleThreadExecutor().submit(run); |
|
172 | 213 |
|
173 | 214 |
log.debug("Returned Queue"); |
174 | 215 |
|
... | ... | |
199 | 240 |
|
200 | 241 |
try { |
201 | 242 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database); |
202 |
List<Map<?, ?>> response = new ArrayList<>(); |
|
243 |
List<Map<?, ?>> response = new ArrayList<Map<?, ?>>();
|
|
203 | 244 |
String query = "SELECT * FROM information_schema.columns WHERE table_name = ?"; |
204 | 245 |
|
205 | 246 |
for (Object o : jdbcTemplate.queryForList(query, new Object[] { table })) { |
... | ... | |
327 | 368 |
|
328 | 369 |
long start = DateUtils.now(); |
329 | 370 |
|
330 |
final List<GenericRow> rows = new ArrayList<>();
|
|
331 |
for (String s : iterable) {
|
|
332 |
rows.addAll(obtainListOfRows(s));
|
|
371 |
List<GenericRow> rows = new ArrayList<GenericRow>();
|
|
372 |
for (String prof : iterable) {
|
|
373 |
rows.addAll(obtainListOfRows(prof));
|
|
333 | 374 |
if (rows.size() > numbersOfRecordsForTransaction) { |
334 | 375 |
counterTotal += rows.size(); |
335 | 376 |
importTransaction(jdbcTemplate, transactionTemplate, rows); |
... | ... | |
357 | 398 |
final TransactionTemplate transactionTemplate, |
358 | 399 |
final List<GenericRow> rows) throws DatabaseException { |
359 | 400 |
|
360 |
final AtomicReference<DatabaseException> error = new AtomicReference<>(); |
|
401 |
final AtomicReference<DatabaseException> error = new AtomicReference<DatabaseException>();
|
|
361 | 402 |
|
362 | 403 |
try { |
363 |
return transactionTemplate.execute(status -> { |
|
364 |
final List<GenericRow> ok = Lists.newArrayList(); |
|
365 |
try { |
|
366 |
for (GenericRow row : rows) { |
|
367 |
if (row.isToDelete()) { |
|
368 |
deleteRow(jdbcTemplate, row); |
|
369 |
} else { |
|
370 |
addOrUpdateRow(jdbcTemplate, row); |
|
404 |
return transactionTemplate.execute(new TransactionCallback<List<GenericRow>>() { |
|
405 |
|
|
406 |
@Override |
|
407 |
public List<GenericRow> doInTransaction(final TransactionStatus status) { |
|
408 |
final List<GenericRow> ok = Lists.newArrayList(); |
|
409 |
try { |
|
410 |
for (GenericRow row : rows) { |
|
411 |
if (row.isToDelete()) { |
|
412 |
deleteRow(jdbcTemplate, row.getTable(), row.getFields()); |
|
413 |
} else { |
|
414 |
addOrUpdateRow(jdbcTemplate, row.getTable(), row.getFields()); |
|
415 |
} |
|
416 |
ok.add(row); |
|
371 | 417 |
} |
372 |
ok.add(row); |
|
418 |
} catch (DatabaseException e) { |
|
419 |
log.warn("Transaction failed", e); |
|
420 |
status.setRollbackOnly(); |
|
421 |
error.set(e); |
|
373 | 422 |
} |
374 |
} catch (DatabaseException e) { |
|
375 |
log.warn("Transaction failed", e); |
|
376 |
status.setRollbackOnly(); |
|
377 |
error.set(e); |
|
423 |
return ok; |
|
378 | 424 |
} |
379 |
return ok; |
|
380 | 425 |
}); |
381 | 426 |
} finally { |
382 | 427 |
if (error.get() != null) { |
... | ... | |
385 | 430 |
} |
386 | 431 |
} |
387 | 432 |
|
388 |
protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
|
|
433 |
protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
|
|
389 | 434 |
try { |
390 | 435 |
|
391 | 436 |
if (log.isDebugEnabled()) { |
392 |
log.debug("Adding or updating element to table " + row.getTable());
|
|
437 |
log.debug("Adding or updating element to table " + table);
|
|
393 | 438 |
} |
394 |
verifyParameters(row.getTable());
|
|
395 |
verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
|
|
439 |
verifyParameters(table);
|
|
440 |
verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
|
|
396 | 441 |
|
397 |
final List<Map<String, Object>> res = |
|
398 |
jdbcTemplate.queryForList("SELECT 1 FROM datasources d WHERE d.id = ? and d.managed is true", row.getDatasourceId()); |
|
399 |
|
|
400 |
if (res != null && !res.isEmpty()) { |
|
401 |
log.debug(String.format("avoiding to update managed datasource %s", row.getDatasourceId())); |
|
402 |
return; |
|
403 |
} |
|
404 |
|
|
405 | 442 |
String fields = ""; |
406 | 443 |
String values = ""; |
407 |
List<Object> list = new ArrayList<>(); |
|
444 |
List<Object> list = new ArrayList<Object>();
|
|
408 | 445 |
|
409 |
for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
|
|
446 |
for (Map.Entry<String, Object> e : rowFields.entrySet()) {
|
|
410 | 447 |
if (!fields.isEmpty()) { |
411 | 448 |
fields += ","; |
412 | 449 |
} |
... | ... | |
419 | 456 |
} |
420 | 457 |
|
421 | 458 |
int count = 0; |
422 |
if (row.getFields().containsKey(DNET_RESOURCE_ID_FIELD)) {
|
|
423 |
List<Object> list2 = new ArrayList<>(); |
|
459 |
if (rowFields.containsKey(DNET_RESOURCE_ID_FIELD)) {
|
|
460 |
List<Object> list2 = new ArrayList<Object>();
|
|
424 | 461 |
list2.addAll(list); |
425 |
list2.add(row.getFields().get(DNET_RESOURCE_ID_FIELD)); |
|
426 |
|
|
427 |
count = jdbcTemplate.update(String.format("UPDATE %s SET (%s) = (%s) WHERE %s = ?", row.getTable(), fields, values, DNET_RESOURCE_ID_FIELD), list2.toArray()); |
|
462 |
list2.add(rowFields.get(DNET_RESOURCE_ID_FIELD)); |
|
463 |
count = |
|
464 |
jdbcTemplate.update("UPDATE " + table + " SET (" + fields + ") = (" + values + ") WHERE " + DNET_RESOURCE_ID_FIELD + "=?", |
|
465 |
list2.toArray()); |
|
428 | 466 |
} |
429 | 467 |
if (count == 0) { |
430 |
jdbcTemplate.update(String.format("INSERT INTO %s (%s) VALUES (%s)", row.getTable(), fields, values), list.toArray());
|
|
468 |
jdbcTemplate.update("INSERT INTO " + table + " (" + fields + ") VALUES (" + values + ")", list.toArray());
|
|
431 | 469 |
} |
432 | 470 |
} catch (final Exception e) { |
433 | 471 |
throw new DatabaseException("Error adding or updating record", e); |
434 | 472 |
} |
435 | 473 |
} |
436 | 474 |
|
437 |
protected void deleteRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
|
|
475 |
protected void deleteRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
|
|
438 | 476 |
if (log.isDebugEnabled()) { |
439 |
log.debug("Deleting element from table " + row.getTable());
|
|
477 |
log.debug("Deleting element from table " + table);
|
|
440 | 478 |
} |
441 |
verifyParameters(row.getTable());
|
|
442 |
verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
|
|
479 |
verifyParameters(table);
|
|
480 |
verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
|
|
443 | 481 |
|
444 |
List<Object> list = new ArrayList<>(); |
|
482 |
List<Object> list = new ArrayList<Object>();
|
|
445 | 483 |
|
446 | 484 |
String where = ""; |
447 | 485 |
|
448 |
for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
|
|
486 |
for (Map.Entry<String, Object> e : rowFields.entrySet()) {
|
|
449 | 487 |
if (!where.isEmpty()) { |
450 | 488 |
where += " AND "; |
451 | 489 |
} |
... | ... | |
454 | 492 |
} |
455 | 493 |
|
456 | 494 |
if (where.isEmpty()) throw new DatabaseException("Delete condition is empty"); |
457 |
int n = jdbcTemplate.update("DELETE FROM " + row.getTable() + " WHERE " + where, list.toArray());
|
|
495 |
int n = jdbcTemplate.update("DELETE FROM " + table + " WHERE " + where, list.toArray());
|
|
458 | 496 |
|
459 | 497 |
if (log.isDebugEnabled()) { |
460 | 498 |
log.debug("Number of Deleted records: " + n); |
... | ... | |
505 | 543 |
table, DNET_RESOURCE_ID_FIELD) == 1; |
506 | 544 |
} |
507 | 545 |
|
546 |
public boolean isLoggedTable(final String database, final String table) throws DatabaseException { |
|
547 |
verifyParameters(database, table); |
|
548 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database); |
|
549 |
return isLoggedTable(jdbcTemplate, table); |
|
550 |
} |
|
551 |
|
|
552 |
private boolean isLoggedTable(final JdbcTemplate jdbcTemplate, final String table) { |
|
553 |
return jdbcTemplate.queryForObject("SELECT count(*) FROM information_schema.tables WHERE table_name = ?", Integer.class, |
|
554 |
table + "_log") == 1; |
|
555 |
} |
|
556 |
|
|
508 | 557 |
public String getDefaultDnetIdentifier(final String database, final String table) throws DatabaseException { |
509 | 558 |
verifyParameters(database, table); |
510 | 559 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database); |
... | ... | |
541 | 590 |
|
542 | 591 |
public String getSQLFromTemplate(final String sqlTemplate, final String db, final String table, Map<String, Object> map) { |
543 | 592 |
if (map == null) { |
544 |
map = new HashMap<>(); |
|
593 |
map = new HashMap<String, Object>();
|
|
545 | 594 |
} |
546 | 595 |
|
547 | 596 |
map.put("mainDB", defaultDB); |
... | ... | |
554 | 603 |
|
555 | 604 |
public List<GenericRow> obtainListOfRows(final String xml) throws DatabaseException { |
556 | 605 |
try { |
557 |
final Document doc = new SAXReader().read(new StringReader(xml));
|
|
606 |
Document doc = new SAXReader().read(new StringReader(xml)); |
|
558 | 607 |
|
559 |
final String datasourceId = doc.valueOf("//ROWS/ROW[@table='datasources']/FIELD[@name='id']/text()"); |
|
560 |
if (StringUtils.isBlank(datasourceId)) { |
|
561 |
throw new DatabaseException("datasource Id is missing, check your mappings!"); |
|
562 |
} |
|
608 |
List<GenericRow> list = new ArrayList<GenericRow>(); |
|
563 | 609 |
|
564 |
final List<GenericRow> list = Lists.newArrayList(); |
|
565 |
|
|
566 | 610 |
for (Object or : doc.selectNodes("//ROW")) { |
567 | 611 |
Element row = (Element) or; |
568 | 612 |
|
... | ... | |
611 | 655 |
} |
612 | 656 |
} |
613 | 657 |
|
614 |
list.add(new GenericRow(datasourceId, table, fields, toDelete));
|
|
658 |
list.add(new GenericRow(table, fields, toDelete)); |
|
615 | 659 |
} |
616 | 660 |
return list; |
617 | 661 |
} catch (Exception e) { |
... | ... | |
710 | 754 |
this.defaultDB = defaultDB; |
711 | 755 |
} |
712 | 756 |
|
757 |
public class TableDates { |
|
758 |
|
|
759 |
private Date lastInsert; |
|
760 |
private Date lastUpdate; |
|
761 |
private Date lastDelete; |
|
762 |
|
|
763 |
public Date getLastInsert() { |
|
764 |
return lastInsert; |
|
765 |
} |
|
766 |
|
|
767 |
public void setLastInsert(final Date lastInsert) { |
|
768 |
this.lastInsert = lastInsert; |
|
769 |
} |
|
770 |
|
|
771 |
public Date getLastUpdate() { |
|
772 |
return lastUpdate; |
|
773 |
} |
|
774 |
|
|
775 |
public void setLastUpdate(final Date lastUpdate) { |
|
776 |
this.lastUpdate = lastUpdate; |
|
777 |
} |
|
778 |
|
|
779 |
public Date getLastDelete() { |
|
780 |
return lastDelete; |
|
781 |
} |
|
782 |
|
|
783 |
public void setLastDelete(final Date lastDelete) { |
|
784 |
this.lastDelete = lastDelete; |
|
785 |
} |
|
786 |
} |
|
787 |
|
|
713 | 788 |
} |
Also available in: Unified diff
reverted to r46768