Project

General

Profile

« Previous | Next » 

Revision 46995

reverted to r46768

View differences:

DatabaseUtils.java
1 1
package eu.dnetlib.enabling.database.utils;
2 2

  
3 3
import java.io.StringReader;
4
import java.sql.Array;
4
import java.sql.*;
5 5
import java.sql.Date;
6
import java.sql.ResultSetMetaData;
7 6
import java.text.ParseException;
8 7
import java.text.SimpleDateFormat;
9 8
import java.util.*;
......
25 24
import eu.dnetlib.miscutils.datetime.DateUtils;
26 25
import eu.dnetlib.miscutils.functional.string.Sanitizer;
27 26
import org.apache.commons.lang.BooleanUtils;
28
import org.apache.commons.lang.StringUtils;
29 27
import org.apache.commons.logging.Log;
30 28
import org.apache.commons.logging.LogFactory;
31 29
import org.apache.velocity.app.VelocityEngine;
......
39 37
import org.springframework.beans.factory.annotation.Required;
40 38
import org.springframework.dao.DataAccessException;
41 39
import org.springframework.jdbc.core.JdbcTemplate;
40
import org.springframework.jdbc.core.RowCallbackHandler;
42 41
import org.springframework.jdbc.support.rowset.SqlRowSet;
42
import org.springframework.transaction.TransactionStatus;
43
import org.springframework.transaction.support.TransactionCallback;
43 44
import org.springframework.transaction.support.TransactionTemplate;
44 45
import org.springframework.ui.velocity.VelocityEngineUtils;
45 46

  
......
74 75
		return getTypedListFromSql(database, query, String.class);
75 76
	}
76 77

  
78
	public Map<String, TableDates> getTableDatesForDB(final String db) throws DatabaseException {
79
		Map<String, TableDates> res = new HashMap<String, TableDates>();
80

  
81
		for (String table : listCommonDBTables(db)) {
82
			try {
83
				TableDates dates = new TableDates();
84

  
85
				String query =
86
						"select lastinsert, lastupdate, lastdelete from " + "(select max(date) as lastinsert from " + table
87
								+ "_log where operation='insert') as t1, " + "(select max(date) as lastupdate from " + table
88
								+ "_log where operation='update') as t2, " + "(select max(date) as lastdelete from " + table
89
								+ "_log where operation='delete') as t3";
90

  
91
				SqlRowSet srs = executeSql(db, query, SqlRowSet.class);
92
				if (srs.next()) {
93
					dates.setLastInsert(srs.getDate("lastinsert"));
94
					dates.setLastUpdate(srs.getDate("lastupdate"));
95
					dates.setLastDelete(srs.getDate("lastdelete"));
96
				}
97
				res.put(table, dates);
98
			} catch (Exception e) {
99
				log.warn("Error obtaing dates for table " + table, e);
100
			}
101
		}
102
		return res;
103
	}
104

  
77 105
	public List<DnetDatabase> listAllDatabases() throws DatabaseException {
78 106
		final String query =
79 107
				"SELECT d.datname AS db, COALESCE(dsc.description,'')='isManaged' AS managed FROM pg_database d LEFT OUTER JOIN pg_shdescription dsc ON (d.oid = dsc.objoid) WHERE d.datname LIKE '"
......
107 135
		JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(dbName);
108 136

  
109 137
		try {
110
			List<String> list = new ArrayList<>();
138
			List<String> list = new ArrayList<String>();
111 139
			for (Object obj : jdbcTemplate.queryForList(query)) {
112 140
				list.add(obj.toString());
113 141
			}
......
135 163

  
136 164
				final ArrayBlockingQueue<Map<String, Object>> q = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
137 165

  
138
				Executors.newSingleThreadExecutor().submit(() -> {
139
					try {
140
						jdbcTemplate.query(query, rs -> {
166
				Runnable run = new Runnable() {
141 167

  
142
							ResultSetMetaData md = rs.getMetaData();
143
							Map<String, Object> row = new HashMap<>();
144
							for (int i = 1; i <= md.getColumnCount(); i++) {
145
								row.put(md.getColumnName(i), rs.getObject(i));
146
							}
147
							try {
148
								if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
149
									log.warn("The consumer doesn't consume my queue, I stop");
150
									rs.close();
151
									return;
168
					@Override
169
					public void run() {
170
						try {
171
							jdbcTemplate.query(query, getRowCallback(q));
172
						} catch (Throwable e) {
173
							log.error("Exception executing SQL", e);
174
							throw new RuntimeException(e);
175
						}
176
						try {
177
							// An empty Map indicates the end of the resultset
178
							q.offer(new HashMap<String, Object>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
179
						} catch (InterruptedException e) {
180
							log.error("Error putting LAST element in queue");
181
							throw new RuntimeException(e);
182
						}
183
						log.debug(" -- End of Sql Resultset");
184
					}
185

  
186
					private RowCallbackHandler getRowCallback(final BlockingQueue<Map<String, Object>> q) {
187
						return new RowCallbackHandler() {
188

  
189
							@Override
190
							public void processRow(final ResultSet rs) throws SQLException {
191

  
192
								ResultSetMetaData md = rs.getMetaData();
193
								Map<String, Object> row = new HashMap<String, Object>();
194
								for (int i = 1; i <= md.getColumnCount(); i++) {
195
									row.put(md.getColumnName(i), rs.getObject(i));
152 196
								}
153
								log.debug("Putted element in queue");
154
							} catch (InterruptedException e) {
155
								log.error("Error putting element in queue");
156
								throw new RuntimeException(e);
197
								try {
198
									if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
199
										log.warn("The consumer doesn't consume my queue, I stop");
200
										rs.close();
201
										return;
202
									}
203
									log.debug("Putted element in queue");
204
								} catch (InterruptedException e) {
205
									log.error("Error putting element in queue");
206
									throw new RuntimeException(e);
207
								}
157 208
							}
158
						});
159
					} catch (Throwable e) {
160
						log.error("Exception executing SQL", e);
161
						throw new RuntimeException(e);
209
						};
162 210
					}
163
					try {
164
						// An empty Map indicates the end of the resultset
165
						q.offer(new HashMap<>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
166
					} catch (InterruptedException e) {
167
						log.error("Error putting LAST element in queue");
168
						throw new RuntimeException(e);
169
					}
170
					log.debug(" -- End of Sql Resultset");
171
				});
211
				};
212
				Executors.newSingleThreadExecutor().submit(run);
172 213

  
173 214
				log.debug("Returned Queue");
174 215

  
......
199 240

  
200 241
		try {
201 242
			JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
202
			List<Map<?, ?>> response = new ArrayList<>();
243
			List<Map<?, ?>> response = new ArrayList<Map<?, ?>>();
203 244
			String query = "SELECT * FROM information_schema.columns WHERE table_name = ?";
204 245

  
205 246
			for (Object o : jdbcTemplate.queryForList(query, new Object[] { table })) {
......
327 368

  
328 369
		long start = DateUtils.now();
329 370

  
330
		final List<GenericRow> rows = new ArrayList<>();
331
		for (String s : iterable) {
332
			rows.addAll(obtainListOfRows(s));
371
		List<GenericRow> rows = new ArrayList<GenericRow>();
372
		for (String prof : iterable) {
373
			rows.addAll(obtainListOfRows(prof));
333 374
			if (rows.size() > numbersOfRecordsForTransaction) {
334 375
				counterTotal += rows.size();
335 376
				importTransaction(jdbcTemplate, transactionTemplate, rows);
......
357 398
			final TransactionTemplate transactionTemplate,
358 399
			final List<GenericRow> rows) throws DatabaseException {
359 400

  
360
		final AtomicReference<DatabaseException> error = new AtomicReference<>();
401
		final AtomicReference<DatabaseException> error = new AtomicReference<DatabaseException>();
361 402

  
362 403
		try {
363
			return transactionTemplate.execute(status -> {
364
				final List<GenericRow> ok = Lists.newArrayList();
365
				try {
366
					for (GenericRow row : rows) {
367
						if (row.isToDelete()) {
368
							deleteRow(jdbcTemplate, row);
369
						} else {
370
							addOrUpdateRow(jdbcTemplate, row);
404
			return transactionTemplate.execute(new TransactionCallback<List<GenericRow>>() {
405

  
406
				@Override
407
				public List<GenericRow> doInTransaction(final TransactionStatus status) {
408
					final List<GenericRow> ok = Lists.newArrayList();
409
					try {
410
						for (GenericRow row : rows) {
411
							if (row.isToDelete()) {
412
								deleteRow(jdbcTemplate, row.getTable(), row.getFields());
413
							} else {
414
								addOrUpdateRow(jdbcTemplate, row.getTable(), row.getFields());
415
							}
416
							ok.add(row);
371 417
						}
372
						ok.add(row);
418
					} catch (DatabaseException e) {
419
						log.warn("Transaction failed", e);
420
						status.setRollbackOnly();
421
						error.set(e);
373 422
					}
374
				} catch (DatabaseException e) {
375
					log.warn("Transaction failed", e);
376
					status.setRollbackOnly();
377
					error.set(e);
423
					return ok;
378 424
				}
379
				return ok;
380 425
			});
381 426
		} finally {
382 427
			if (error.get() != null) {
......
385 430
		}
386 431
	}
387 432

  
388
	protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
433
	protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
389 434
		try {
390 435

  
391 436
			if (log.isDebugEnabled()) {
392
				log.debug("Adding or updating element to table " + row.getTable());
437
				log.debug("Adding or updating element to table " + table);
393 438
			}
394
			verifyParameters(row.getTable());
395
			verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
439
			verifyParameters(table);
440
			verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
396 441

  
397
			final List<Map<String, Object>> res =
398
					jdbcTemplate.queryForList("SELECT 1 FROM datasources d WHERE d.id = ? and d.managed is true", row.getDatasourceId());
399

  
400
			if (res != null && !res.isEmpty()) {
401
				log.debug(String.format("avoiding to update managed datasource %s", row.getDatasourceId()));
402
				return;
403
			}
404

  
405 442
			String fields = "";
406 443
			String values = "";
407
			List<Object> list = new ArrayList<>();
444
			List<Object> list = new ArrayList<Object>();
408 445

  
409
			for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
446
			for (Map.Entry<String, Object> e : rowFields.entrySet()) {
410 447
				if (!fields.isEmpty()) {
411 448
					fields += ",";
412 449
				}
......
419 456
			}
420 457

  
421 458
			int count = 0;
422
			if (row.getFields().containsKey(DNET_RESOURCE_ID_FIELD)) {
423
				List<Object> list2 = new ArrayList<>();
459
			if (rowFields.containsKey(DNET_RESOURCE_ID_FIELD)) {
460
				List<Object> list2 = new ArrayList<Object>();
424 461
				list2.addAll(list);
425
				list2.add(row.getFields().get(DNET_RESOURCE_ID_FIELD));
426

  
427
				count = jdbcTemplate.update(String.format("UPDATE %s SET (%s) = (%s) WHERE %s = ?", row.getTable(), fields, values, DNET_RESOURCE_ID_FIELD), list2.toArray());
462
				list2.add(rowFields.get(DNET_RESOURCE_ID_FIELD));
463
				count =
464
						jdbcTemplate.update("UPDATE " + table + " SET (" + fields + ") = (" + values + ") WHERE " + DNET_RESOURCE_ID_FIELD + "=?",
465
								list2.toArray());
428 466
			}
429 467
			if (count == 0) {
430
				jdbcTemplate.update(String.format("INSERT INTO %s (%s) VALUES (%s)", row.getTable(), fields, values), list.toArray());
468
				jdbcTemplate.update("INSERT INTO " + table + " (" + fields + ") VALUES (" + values + ")", list.toArray());
431 469
			}
432 470
		} catch (final Exception e) {
433 471
			throw new DatabaseException("Error adding or updating record", e);
434 472
		}
435 473
	}
436 474

  
437
	protected void deleteRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
475
	protected void deleteRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
438 476
		if (log.isDebugEnabled()) {
439
			log.debug("Deleting element from table " + row.getTable());
477
			log.debug("Deleting element from table " + table);
440 478
		}
441
		verifyParameters(row.getTable());
442
		verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
479
		verifyParameters(table);
480
		verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
443 481

  
444
		List<Object> list = new ArrayList<>();
482
		List<Object> list = new ArrayList<Object>();
445 483

  
446 484
		String where = "";
447 485

  
448
		for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
486
		for (Map.Entry<String, Object> e : rowFields.entrySet()) {
449 487
			if (!where.isEmpty()) {
450 488
				where += " AND ";
451 489
			}
......
454 492
		}
455 493

  
456 494
		if (where.isEmpty()) throw new DatabaseException("Delete condition is empty");
457
		int n = jdbcTemplate.update("DELETE FROM " + row.getTable() + " WHERE " + where, list.toArray());
495
		int n = jdbcTemplate.update("DELETE FROM " + table + " WHERE " + where, list.toArray());
458 496

  
459 497
		if (log.isDebugEnabled()) {
460 498
			log.debug("Number of Deleted records: " + n);
......
505 543
				table, DNET_RESOURCE_ID_FIELD) == 1;
506 544
	}
507 545

  
546
	public boolean isLoggedTable(final String database, final String table) throws DatabaseException {
547
		verifyParameters(database, table);
548
		JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
549
		return isLoggedTable(jdbcTemplate, table);
550
	}
551

  
552
	private boolean isLoggedTable(final JdbcTemplate jdbcTemplate, final String table) {
553
		return jdbcTemplate.queryForObject("SELECT count(*) FROM information_schema.tables WHERE table_name = ?", Integer.class,
554
				table + "_log") == 1;
555
	}
556

  
508 557
	public String getDefaultDnetIdentifier(final String database, final String table) throws DatabaseException {
509 558
		verifyParameters(database, table);
510 559
		JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
......
541 590

  
542 591
	public String getSQLFromTemplate(final String sqlTemplate, final String db, final String table, Map<String, Object> map) {
543 592
		if (map == null) {
544
			map = new HashMap<>();
593
			map = new HashMap<String, Object>();
545 594
		}
546 595

  
547 596
		map.put("mainDB", defaultDB);
......
554 603

  
555 604
	public List<GenericRow> obtainListOfRows(final String xml) throws DatabaseException {
556 605
		try {
557
			final Document doc = new SAXReader().read(new StringReader(xml));
606
			Document doc = new SAXReader().read(new StringReader(xml));
558 607

  
559
			final String datasourceId = doc.valueOf("//ROWS/ROW[@table='datasources']/FIELD[@name='id']/text()");
560
			if (StringUtils.isBlank(datasourceId)) {
561
				throw new DatabaseException("datasource Id is missing, check your mappings!");
562
			}
608
			List<GenericRow> list = new ArrayList<GenericRow>();
563 609

  
564
			final List<GenericRow> list = Lists.newArrayList();
565

  
566 610
			for (Object or : doc.selectNodes("//ROW")) {
567 611
				Element row = (Element) or;
568 612

  
......
611 655
					}
612 656
				}
613 657

  
614
				list.add(new GenericRow(datasourceId, table, fields, toDelete));
658
				list.add(new GenericRow(table, fields, toDelete));
615 659
			}
616 660
			return list;
617 661
		} catch (Exception e) {
......
710 754
		this.defaultDB = defaultDB;
711 755
	}
712 756

  
757
	public class TableDates {
758

  
759
		private Date lastInsert;
760
		private Date lastUpdate;
761
		private Date lastDelete;
762

  
763
		public Date getLastInsert() {
764
			return lastInsert;
765
		}
766

  
767
		public void setLastInsert(final Date lastInsert) {
768
			this.lastInsert = lastInsert;
769
		}
770

  
771
		public Date getLastUpdate() {
772
			return lastUpdate;
773
		}
774

  
775
		public void setLastUpdate(final Date lastUpdate) {
776
			this.lastUpdate = lastUpdate;
777
		}
778

  
779
		public Date getLastDelete() {
780
			return lastDelete;
781
		}
782

  
783
		public void setLastDelete(final Date lastDelete) {
784
			this.lastDelete = lastDelete;
785
		}
786
	}
787

  
713 788
}

Also available in: Unified diff