1 |
1 |
package eu.dnetlib.enabling.database.utils;
|
2 |
2 |
|
3 |
3 |
import java.io.StringReader;
|
4 |
|
import java.sql.*;
|
|
4 |
import java.sql.Array;
|
5 |
5 |
import java.sql.Date;
|
|
6 |
import java.sql.ResultSetMetaData;
|
6 |
7 |
import java.text.ParseException;
|
7 |
8 |
import java.text.SimpleDateFormat;
|
8 |
9 |
import java.util.*;
|
... | ... | |
24 |
25 |
import eu.dnetlib.miscutils.datetime.DateUtils;
|
25 |
26 |
import eu.dnetlib.miscutils.functional.string.Sanitizer;
|
26 |
27 |
import org.apache.commons.lang.BooleanUtils;
|
|
28 |
import org.apache.commons.lang.StringUtils;
|
27 |
29 |
import org.apache.commons.logging.Log;
|
28 |
30 |
import org.apache.commons.logging.LogFactory;
|
29 |
31 |
import org.apache.velocity.app.VelocityEngine;
|
... | ... | |
37 |
39 |
import org.springframework.beans.factory.annotation.Required;
|
38 |
40 |
import org.springframework.dao.DataAccessException;
|
39 |
41 |
import org.springframework.jdbc.core.JdbcTemplate;
|
40 |
|
import org.springframework.jdbc.core.RowCallbackHandler;
|
41 |
42 |
import org.springframework.jdbc.support.rowset.SqlRowSet;
|
42 |
|
import org.springframework.transaction.TransactionStatus;
|
43 |
|
import org.springframework.transaction.support.TransactionCallback;
|
44 |
43 |
import org.springframework.transaction.support.TransactionTemplate;
|
45 |
44 |
import org.springframework.ui.velocity.VelocityEngineUtils;
|
46 |
45 |
|
... | ... | |
75 |
74 |
return getTypedListFromSql(database, query, String.class);
|
76 |
75 |
}
|
77 |
76 |
|
78 |
|
public Map<String, TableDates> getTableDatesForDB(final String db) throws DatabaseException {
|
79 |
|
Map<String, TableDates> res = new HashMap<String, TableDates>();
|
80 |
|
|
81 |
|
for (String table : listCommonDBTables(db)) {
|
82 |
|
try {
|
83 |
|
TableDates dates = new TableDates();
|
84 |
|
|
85 |
|
String query =
|
86 |
|
"select lastinsert, lastupdate, lastdelete from " + "(select max(date) as lastinsert from " + table
|
87 |
|
+ "_log where operation='insert') as t1, " + "(select max(date) as lastupdate from " + table
|
88 |
|
+ "_log where operation='update') as t2, " + "(select max(date) as lastdelete from " + table
|
89 |
|
+ "_log where operation='delete') as t3";
|
90 |
|
|
91 |
|
SqlRowSet srs = executeSql(db, query, SqlRowSet.class);
|
92 |
|
if (srs.next()) {
|
93 |
|
dates.setLastInsert(srs.getDate("lastinsert"));
|
94 |
|
dates.setLastUpdate(srs.getDate("lastupdate"));
|
95 |
|
dates.setLastDelete(srs.getDate("lastdelete"));
|
96 |
|
}
|
97 |
|
res.put(table, dates);
|
98 |
|
} catch (Exception e) {
|
99 |
|
log.warn("Error obtaing dates for table " + table, e);
|
100 |
|
}
|
101 |
|
}
|
102 |
|
return res;
|
103 |
|
}
|
104 |
|
|
105 |
77 |
public List<DnetDatabase> listAllDatabases() throws DatabaseException {
|
106 |
78 |
final String query =
|
107 |
79 |
"SELECT d.datname AS db, COALESCE(dsc.description,'')='isManaged' AS managed FROM pg_database d LEFT OUTER JOIN pg_shdescription dsc ON (d.oid = dsc.objoid) WHERE d.datname LIKE '"
|
... | ... | |
135 |
107 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(dbName);
|
136 |
108 |
|
137 |
109 |
try {
|
138 |
|
List<String> list = new ArrayList<String>();
|
|
110 |
List<String> list = new ArrayList<>();
|
139 |
111 |
for (Object obj : jdbcTemplate.queryForList(query)) {
|
140 |
112 |
list.add(obj.toString());
|
141 |
113 |
}
|
... | ... | |
163 |
135 |
|
164 |
136 |
final ArrayBlockingQueue<Map<String, Object>> q = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
|
165 |
137 |
|
166 |
|
Runnable run = new Runnable() {
|
|
138 |
Executors.newSingleThreadExecutor().submit(() -> {
|
|
139 |
try {
|
|
140 |
jdbcTemplate.query(query, rs -> {
|
167 |
141 |
|
168 |
|
@Override
|
169 |
|
public void run() {
|
170 |
|
try {
|
171 |
|
jdbcTemplate.query(query, getRowCallback(q));
|
172 |
|
} catch (Throwable e) {
|
173 |
|
log.error("Exception executing SQL", e);
|
174 |
|
throw new RuntimeException(e);
|
175 |
|
}
|
176 |
|
try {
|
177 |
|
// An empty Map indicates the end of the resultset
|
178 |
|
q.offer(new HashMap<String, Object>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
|
179 |
|
} catch (InterruptedException e) {
|
180 |
|
log.error("Error putting LAST element in queue");
|
181 |
|
throw new RuntimeException(e);
|
182 |
|
}
|
183 |
|
log.debug(" -- End of Sql Resultset");
|
184 |
|
}
|
185 |
|
|
186 |
|
private RowCallbackHandler getRowCallback(final BlockingQueue<Map<String, Object>> q) {
|
187 |
|
return new RowCallbackHandler() {
|
188 |
|
|
189 |
|
@Override
|
190 |
|
public void processRow(final ResultSet rs) throws SQLException {
|
191 |
|
|
192 |
|
ResultSetMetaData md = rs.getMetaData();
|
193 |
|
Map<String, Object> row = new HashMap<String, Object>();
|
194 |
|
for (int i = 1; i <= md.getColumnCount(); i++) {
|
195 |
|
row.put(md.getColumnName(i), rs.getObject(i));
|
|
142 |
ResultSetMetaData md = rs.getMetaData();
|
|
143 |
Map<String, Object> row = new HashMap<>();
|
|
144 |
for (int i = 1; i <= md.getColumnCount(); i++) {
|
|
145 |
row.put(md.getColumnName(i), rs.getObject(i));
|
|
146 |
}
|
|
147 |
try {
|
|
148 |
if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
|
|
149 |
log.warn("The consumer doesn't consume my queue, I stop");
|
|
150 |
rs.close();
|
|
151 |
return;
|
196 |
152 |
}
|
197 |
|
try {
|
198 |
|
if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
|
199 |
|
log.warn("The consumer doesn't consume my queue, I stop");
|
200 |
|
rs.close();
|
201 |
|
return;
|
202 |
|
}
|
203 |
|
log.debug("Putted element in queue");
|
204 |
|
} catch (InterruptedException e) {
|
205 |
|
log.error("Error putting element in queue");
|
206 |
|
throw new RuntimeException(e);
|
207 |
|
}
|
|
153 |
log.debug("Putted element in queue");
|
|
154 |
} catch (InterruptedException e) {
|
|
155 |
log.error("Error putting element in queue");
|
|
156 |
throw new RuntimeException(e);
|
208 |
157 |
}
|
209 |
|
};
|
|
158 |
});
|
|
159 |
} catch (Throwable e) {
|
|
160 |
log.error("Exception executing SQL", e);
|
|
161 |
throw new RuntimeException(e);
|
210 |
162 |
}
|
211 |
|
};
|
212 |
|
Executors.newSingleThreadExecutor().submit(run);
|
|
163 |
try {
|
|
164 |
// An empty Map indicates the end of the resultset
|
|
165 |
q.offer(new HashMap<>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
|
|
166 |
} catch (InterruptedException e) {
|
|
167 |
log.error("Error putting LAST element in queue");
|
|
168 |
throw new RuntimeException(e);
|
|
169 |
}
|
|
170 |
log.debug(" -- End of Sql Resultset");
|
|
171 |
});
|
213 |
172 |
|
214 |
173 |
log.debug("Returned Queue");
|
215 |
174 |
|
... | ... | |
240 |
199 |
|
241 |
200 |
try {
|
242 |
201 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
|
243 |
|
List<Map<?, ?>> response = new ArrayList<Map<?, ?>>();
|
|
202 |
List<Map<?, ?>> response = new ArrayList<>();
|
244 |
203 |
String query = "SELECT * FROM information_schema.columns WHERE table_name = ?";
|
245 |
204 |
|
246 |
205 |
for (Object o : jdbcTemplate.queryForList(query, new Object[] { table })) {
|
... | ... | |
368 |
327 |
|
369 |
328 |
long start = DateUtils.now();
|
370 |
329 |
|
371 |
|
List<GenericRow> rows = new ArrayList<GenericRow>();
|
372 |
|
for (String prof : iterable) {
|
373 |
|
rows.addAll(obtainListOfRows(prof));
|
|
330 |
final List<GenericRow> rows = new ArrayList<>();
|
|
331 |
for (String s : iterable) {
|
|
332 |
rows.addAll(obtainListOfRows(s));
|
374 |
333 |
if (rows.size() > numbersOfRecordsForTransaction) {
|
375 |
334 |
counterTotal += rows.size();
|
376 |
335 |
importTransaction(jdbcTemplate, transactionTemplate, rows);
|
... | ... | |
398 |
357 |
final TransactionTemplate transactionTemplate,
|
399 |
358 |
final List<GenericRow> rows) throws DatabaseException {
|
400 |
359 |
|
401 |
|
final AtomicReference<DatabaseException> error = new AtomicReference<DatabaseException>();
|
|
360 |
final AtomicReference<DatabaseException> error = new AtomicReference<>();
|
402 |
361 |
|
403 |
362 |
try {
|
404 |
|
return transactionTemplate.execute(new TransactionCallback<List<GenericRow>>() {
|
405 |
|
|
406 |
|
@Override
|
407 |
|
public List<GenericRow> doInTransaction(final TransactionStatus status) {
|
408 |
|
final List<GenericRow> ok = Lists.newArrayList();
|
409 |
|
try {
|
410 |
|
for (GenericRow row : rows) {
|
411 |
|
if (row.isToDelete()) {
|
412 |
|
deleteRow(jdbcTemplate, row.getTable(), row.getFields());
|
413 |
|
} else {
|
414 |
|
addOrUpdateRow(jdbcTemplate, row.getTable(), row.getFields());
|
415 |
|
}
|
416 |
|
ok.add(row);
|
|
363 |
return transactionTemplate.execute(status -> {
|
|
364 |
final List<GenericRow> ok = Lists.newArrayList();
|
|
365 |
try {
|
|
366 |
for (GenericRow row : rows) {
|
|
367 |
if (row.isToDelete()) {
|
|
368 |
deleteRow(jdbcTemplate, row);
|
|
369 |
} else {
|
|
370 |
addOrUpdateRow(jdbcTemplate, row);
|
417 |
371 |
}
|
418 |
|
} catch (DatabaseException e) {
|
419 |
|
log.warn("Transaction failed", e);
|
420 |
|
status.setRollbackOnly();
|
421 |
|
error.set(e);
|
|
372 |
ok.add(row);
|
422 |
373 |
}
|
423 |
|
return ok;
|
|
374 |
} catch (DatabaseException e) {
|
|
375 |
log.warn("Transaction failed", e);
|
|
376 |
status.setRollbackOnly();
|
|
377 |
error.set(e);
|
424 |
378 |
}
|
|
379 |
return ok;
|
425 |
380 |
});
|
426 |
381 |
} finally {
|
427 |
382 |
if (error.get() != null) {
|
... | ... | |
430 |
385 |
}
|
431 |
386 |
}
|
432 |
387 |
|
433 |
|
protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
|
|
388 |
protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
|
434 |
389 |
try {
|
435 |
390 |
|
436 |
391 |
if (log.isDebugEnabled()) {
|
437 |
|
log.debug("Adding or updating element to table " + table);
|
|
392 |
log.debug("Adding or updating element to table " + row.getTable());
|
438 |
393 |
}
|
439 |
|
verifyParameters(table);
|
440 |
|
verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
|
|
394 |
verifyParameters(row.getTable());
|
|
395 |
verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
|
441 |
396 |
|
|
397 |
final List<Map<String, Object>> res =
|
|
398 |
jdbcTemplate.queryForList("SELECT 1 FROM datasources d WHERE d.id = ? and d.managed is true", row.getDatasourceId());
|
|
399 |
|
|
400 |
if (res != null && !res.isEmpty()) {
|
|
401 |
log.debug(String.format("avoiding to update managed datasource %s", row.getDatasourceId()));
|
|
402 |
return;
|
|
403 |
}
|
|
404 |
|
442 |
405 |
String fields = "";
|
443 |
406 |
String values = "";
|
444 |
|
List<Object> list = new ArrayList<Object>();
|
|
407 |
List<Object> list = new ArrayList<>();
|
445 |
408 |
|
446 |
|
for (Map.Entry<String, Object> e : rowFields.entrySet()) {
|
|
409 |
for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
|
447 |
410 |
if (!fields.isEmpty()) {
|
448 |
411 |
fields += ",";
|
449 |
412 |
}
|
... | ... | |
456 |
419 |
}
|
457 |
420 |
|
458 |
421 |
int count = 0;
|
459 |
|
if (rowFields.containsKey(DNET_RESOURCE_ID_FIELD)) {
|
460 |
|
List<Object> list2 = new ArrayList<Object>();
|
|
422 |
if (row.getFields().containsKey(DNET_RESOURCE_ID_FIELD)) {
|
|
423 |
List<Object> list2 = new ArrayList<>();
|
461 |
424 |
list2.addAll(list);
|
462 |
|
list2.add(rowFields.get(DNET_RESOURCE_ID_FIELD));
|
463 |
|
count =
|
464 |
|
jdbcTemplate.update("UPDATE " + table + " SET (" + fields + ") = (" + values + ") WHERE " + DNET_RESOURCE_ID_FIELD + "=?",
|
465 |
|
list2.toArray());
|
|
425 |
list2.add(row.getFields().get(DNET_RESOURCE_ID_FIELD));
|
|
426 |
|
|
427 |
count = jdbcTemplate.update(String.format("UPDATE %s SET (%s) = (%s) WHERE %s = ?", row.getTable(), fields, values, DNET_RESOURCE_ID_FIELD), list2.toArray());
|
466 |
428 |
}
|
467 |
429 |
if (count == 0) {
|
468 |
|
jdbcTemplate.update("INSERT INTO " + table + " (" + fields + ") VALUES (" + values + ")", list.toArray());
|
|
430 |
jdbcTemplate.update(String.format("INSERT INTO %s (%s) VALUES (%s)", row.getTable(), fields, values), list.toArray());
|
469 |
431 |
}
|
470 |
432 |
} catch (final Exception e) {
|
471 |
433 |
throw new DatabaseException("Error adding or updating record", e);
|
472 |
434 |
}
|
473 |
435 |
}
|
474 |
436 |
|
475 |
|
protected void deleteRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
|
|
437 |
protected void deleteRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
|
476 |
438 |
if (log.isDebugEnabled()) {
|
477 |
|
log.debug("Deleting element from table " + table);
|
|
439 |
log.debug("Deleting element from table " + row.getTable());
|
478 |
440 |
}
|
479 |
|
verifyParameters(table);
|
480 |
|
verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
|
|
441 |
verifyParameters(row.getTable());
|
|
442 |
verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
|
481 |
443 |
|
482 |
|
List<Object> list = new ArrayList<Object>();
|
|
444 |
List<Object> list = new ArrayList<>();
|
483 |
445 |
|
484 |
446 |
String where = "";
|
485 |
447 |
|
486 |
|
for (Map.Entry<String, Object> e : rowFields.entrySet()) {
|
|
448 |
for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
|
487 |
449 |
if (!where.isEmpty()) {
|
488 |
450 |
where += " AND ";
|
489 |
451 |
}
|
... | ... | |
492 |
454 |
}
|
493 |
455 |
|
494 |
456 |
if (where.isEmpty()) throw new DatabaseException("Delete condition is empty");
|
495 |
|
int n = jdbcTemplate.update("DELETE FROM " + table + " WHERE " + where, list.toArray());
|
|
457 |
int n = jdbcTemplate.update("DELETE FROM " + row.getTable() + " WHERE " + where, list.toArray());
|
496 |
458 |
|
497 |
459 |
if (log.isDebugEnabled()) {
|
498 |
460 |
log.debug("Number of Deleted records: " + n);
|
... | ... | |
543 |
505 |
table, DNET_RESOURCE_ID_FIELD) == 1;
|
544 |
506 |
}
|
545 |
507 |
|
546 |
|
public boolean isLoggedTable(final String database, final String table) throws DatabaseException {
|
547 |
|
verifyParameters(database, table);
|
548 |
|
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
|
549 |
|
return isLoggedTable(jdbcTemplate, table);
|
550 |
|
}
|
551 |
|
|
552 |
|
private boolean isLoggedTable(final JdbcTemplate jdbcTemplate, final String table) {
|
553 |
|
return jdbcTemplate.queryForObject("SELECT count(*) FROM information_schema.tables WHERE table_name = ?", Integer.class,
|
554 |
|
table + "_log") == 1;
|
555 |
|
}
|
556 |
|
|
557 |
508 |
public String getDefaultDnetIdentifier(final String database, final String table) throws DatabaseException {
|
558 |
509 |
verifyParameters(database, table);
|
559 |
510 |
JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
|
... | ... | |
590 |
541 |
|
591 |
542 |
public String getSQLFromTemplate(final String sqlTemplate, final String db, final String table, Map<String, Object> map) {
|
592 |
543 |
if (map == null) {
|
593 |
|
map = new HashMap<String, Object>();
|
|
544 |
map = new HashMap<>();
|
594 |
545 |
}
|
595 |
546 |
|
596 |
547 |
map.put("mainDB", defaultDB);
|
... | ... | |
603 |
554 |
|
604 |
555 |
public List<GenericRow> obtainListOfRows(final String xml) throws DatabaseException {
|
605 |
556 |
try {
|
606 |
|
Document doc = new SAXReader().read(new StringReader(xml));
|
|
557 |
final Document doc = new SAXReader().read(new StringReader(xml));
|
607 |
558 |
|
608 |
|
List<GenericRow> list = new ArrayList<GenericRow>();
|
|
559 |
final String datasourceId = doc.valueOf("//ROWS/ROW[@table='datasources']/FIELD[@name='id']/text()");
|
|
560 |
if (StringUtils.isBlank(datasourceId)) {
|
|
561 |
throw new DatabaseException("datasource Id is missing, check your mappings!");
|
|
562 |
}
|
609 |
563 |
|
|
564 |
final List<GenericRow> list = Lists.newArrayList();
|
|
565 |
|
610 |
566 |
for (Object or : doc.selectNodes("//ROW")) {
|
611 |
567 |
Element row = (Element) or;
|
612 |
568 |
|
... | ... | |
655 |
611 |
}
|
656 |
612 |
}
|
657 |
613 |
|
658 |
|
list.add(new GenericRow(table, fields, toDelete));
|
|
614 |
list.add(new GenericRow(datasourceId, table, fields, toDelete));
|
659 |
615 |
}
|
660 |
616 |
return list;
|
661 |
617 |
} catch (Exception e) {
|
... | ... | |
754 |
710 |
this.defaultDB = defaultDB;
|
755 |
711 |
}
|
756 |
712 |
|
757 |
|
public class TableDates {
|
758 |
|
|
759 |
|
private Date lastInsert;
|
760 |
|
private Date lastUpdate;
|
761 |
|
private Date lastDelete;
|
762 |
|
|
763 |
|
public Date getLastInsert() {
|
764 |
|
return lastInsert;
|
765 |
|
}
|
766 |
|
|
767 |
|
public void setLastInsert(final Date lastInsert) {
|
768 |
|
this.lastInsert = lastInsert;
|
769 |
|
}
|
770 |
|
|
771 |
|
public Date getLastUpdate() {
|
772 |
|
return lastUpdate;
|
773 |
|
}
|
774 |
|
|
775 |
|
public void setLastUpdate(final Date lastUpdate) {
|
776 |
|
this.lastUpdate = lastUpdate;
|
777 |
|
}
|
778 |
|
|
779 |
|
public Date getLastDelete() {
|
780 |
|
return lastDelete;
|
781 |
|
}
|
782 |
|
|
783 |
|
public void setLastDelete(final Date lastDelete) {
|
784 |
|
this.lastDelete = lastDelete;
|
785 |
|
}
|
786 |
|
}
|
787 |
|
|
788 |
713 |
}
|
added openaire specific behaviour: do not update the datasource related information when the datasource is managed