Project

General

Profile

« Previous | Next » 

Revision 46771

added openaire specific behaviour: do not update the datasource related information when the datasource is managed

View differences:

modules/cnr-enabling-database-service/trunk/src/main/java/eu/dnetlib/enabling/database/utils/GenericRow.java
3 3
import java.util.Map;
4 4

  
5 5
public class GenericRow {
6
	private String datasourceId;
6 7
	private String table;
7 8
	private Map<String, Object> fields;
8 9
	private boolean toDelete = false;
9
	
10
	
11
	public GenericRow(String table, Map<String, Object> fields, boolean toDelete) {
12
		super();
10

  
11
	public GenericRow(final String datasourceId, String table, Map<String, Object> fields, boolean toDelete) {
12
		this.datasourceId = datasourceId;
13 13
		this.table = table;
14 14
		this.fields = fields;
15 15
		this.toDelete = toDelete;
16 16
	}
17
	
17

  
18
	public String getDatasourceId() {
19
		return datasourceId;
20
	}
21

  
22
	public void setDatasourceId(final String datasourceId) {
23
		this.datasourceId = datasourceId;
24
	}
25

  
18 26
	public String getTable() {
19 27
		return table;
20 28
	}
modules/cnr-enabling-database-service/trunk/src/main/java/eu/dnetlib/enabling/database/utils/DatabaseUtils.java
1 1
package eu.dnetlib.enabling.database.utils;
2 2

  
3 3
import java.io.StringReader;
4
import java.sql.*;
4
import java.sql.Array;
5 5
import java.sql.Date;
6
import java.sql.ResultSetMetaData;
6 7
import java.text.ParseException;
7 8
import java.text.SimpleDateFormat;
8 9
import java.util.*;
......
24 25
import eu.dnetlib.miscutils.datetime.DateUtils;
25 26
import eu.dnetlib.miscutils.functional.string.Sanitizer;
26 27
import org.apache.commons.lang.BooleanUtils;
28
import org.apache.commons.lang.StringUtils;
27 29
import org.apache.commons.logging.Log;
28 30
import org.apache.commons.logging.LogFactory;
29 31
import org.apache.velocity.app.VelocityEngine;
......
37 39
import org.springframework.beans.factory.annotation.Required;
38 40
import org.springframework.dao.DataAccessException;
39 41
import org.springframework.jdbc.core.JdbcTemplate;
40
import org.springframework.jdbc.core.RowCallbackHandler;
41 42
import org.springframework.jdbc.support.rowset.SqlRowSet;
42
import org.springframework.transaction.TransactionStatus;
43
import org.springframework.transaction.support.TransactionCallback;
44 43
import org.springframework.transaction.support.TransactionTemplate;
45 44
import org.springframework.ui.velocity.VelocityEngineUtils;
46 45

  
......
75 74
		return getTypedListFromSql(database, query, String.class);
76 75
	}
77 76

  
78
	public Map<String, TableDates> getTableDatesForDB(final String db) throws DatabaseException {
79
		Map<String, TableDates> res = new HashMap<String, TableDates>();
80

  
81
		for (String table : listCommonDBTables(db)) {
82
			try {
83
				TableDates dates = new TableDates();
84

  
85
				String query =
86
						"select lastinsert, lastupdate, lastdelete from " + "(select max(date) as lastinsert from " + table
87
								+ "_log where operation='insert') as t1, " + "(select max(date) as lastupdate from " + table
88
								+ "_log where operation='update') as t2, " + "(select max(date) as lastdelete from " + table
89
								+ "_log where operation='delete') as t3";
90

  
91
				SqlRowSet srs = executeSql(db, query, SqlRowSet.class);
92
				if (srs.next()) {
93
					dates.setLastInsert(srs.getDate("lastinsert"));
94
					dates.setLastUpdate(srs.getDate("lastupdate"));
95
					dates.setLastDelete(srs.getDate("lastdelete"));
96
				}
97
				res.put(table, dates);
98
			} catch (Exception e) {
99
				log.warn("Error obtaing dates for table " + table, e);
100
			}
101
		}
102
		return res;
103
	}
104

  
105 77
	public List<DnetDatabase> listAllDatabases() throws DatabaseException {
106 78
		final String query =
107 79
				"SELECT d.datname AS db, COALESCE(dsc.description,'')='isManaged' AS managed FROM pg_database d LEFT OUTER JOIN pg_shdescription dsc ON (d.oid = dsc.objoid) WHERE d.datname LIKE '"
......
135 107
		JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(dbName);
136 108

  
137 109
		try {
138
			List<String> list = new ArrayList<String>();
110
			List<String> list = new ArrayList<>();
139 111
			for (Object obj : jdbcTemplate.queryForList(query)) {
140 112
				list.add(obj.toString());
141 113
			}
......
163 135

  
164 136
				final ArrayBlockingQueue<Map<String, Object>> q = Queues.newArrayBlockingQueue(BLOCKING_QUEUE_SIZE);
165 137

  
166
				Runnable run = new Runnable() {
138
				Executors.newSingleThreadExecutor().submit(() -> {
139
					try {
140
						jdbcTemplate.query(query, rs -> {
167 141

  
168
					@Override
169
					public void run() {
170
						try {
171
							jdbcTemplate.query(query, getRowCallback(q));
172
						} catch (Throwable e) {
173
							log.error("Exception executing SQL", e);
174
							throw new RuntimeException(e);
175
						}
176
						try {
177
							// An empty Map indicates the end of the resultset
178
							q.offer(new HashMap<String, Object>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
179
						} catch (InterruptedException e) {
180
							log.error("Error putting LAST element in queue");
181
							throw new RuntimeException(e);
182
						}
183
						log.debug(" -- End of Sql Resultset");
184
					}
185

  
186
					private RowCallbackHandler getRowCallback(final BlockingQueue<Map<String, Object>> q) {
187
						return new RowCallbackHandler() {
188

  
189
							@Override
190
							public void processRow(final ResultSet rs) throws SQLException {
191

  
192
								ResultSetMetaData md = rs.getMetaData();
193
								Map<String, Object> row = new HashMap<String, Object>();
194
								for (int i = 1; i <= md.getColumnCount(); i++) {
195
									row.put(md.getColumnName(i), rs.getObject(i));
142
							ResultSetMetaData md = rs.getMetaData();
143
							Map<String, Object> row = new HashMap<>();
144
							for (int i = 1; i <= md.getColumnCount(); i++) {
145
								row.put(md.getColumnName(i), rs.getObject(i));
146
							}
147
							try {
148
								if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
149
									log.warn("The consumer doesn't consume my queue, I stop");
150
									rs.close();
151
									return;
196 152
								}
197
								try {
198
									if (!rs.isClosed() && !q.offer(row, BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS)) {
199
										log.warn("The consumer doesn't consume my queue, I stop");
200
										rs.close();
201
										return;
202
									}
203
									log.debug("Putted element in queue");
204
								} catch (InterruptedException e) {
205
									log.error("Error putting element in queue");
206
									throw new RuntimeException(e);
207
								}
153
								log.debug("Putted element in queue");
154
							} catch (InterruptedException e) {
155
								log.error("Error putting element in queue");
156
								throw new RuntimeException(e);
208 157
							}
209
						};
158
						});
159
					} catch (Throwable e) {
160
						log.error("Exception executing SQL", e);
161
						throw new RuntimeException(e);
210 162
					}
211
				};
212
				Executors.newSingleThreadExecutor().submit(run);
163
					try {
164
						// An empty Map indicates the end of the resultset
165
						q.offer(new HashMap<>(), BLOCKING_QUEUE_TIMEOUT, TimeUnit.SECONDS);
166
					} catch (InterruptedException e) {
167
						log.error("Error putting LAST element in queue");
168
						throw new RuntimeException(e);
169
					}
170
					log.debug(" -- End of Sql Resultset");
171
				});
213 172

  
214 173
				log.debug("Returned Queue");
215 174

  
......
240 199

  
241 200
		try {
242 201
			JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
243
			List<Map<?, ?>> response = new ArrayList<Map<?, ?>>();
202
			List<Map<?, ?>> response = new ArrayList<>();
244 203
			String query = "SELECT * FROM information_schema.columns WHERE table_name = ?";
245 204

  
246 205
			for (Object o : jdbcTemplate.queryForList(query, new Object[] { table })) {
......
368 327

  
369 328
		long start = DateUtils.now();
370 329

  
371
		List<GenericRow> rows = new ArrayList<GenericRow>();
372
		for (String prof : iterable) {
373
			rows.addAll(obtainListOfRows(prof));
330
		final List<GenericRow> rows = new ArrayList<>();
331
		for (String s : iterable) {
332
			rows.addAll(obtainListOfRows(s));
374 333
			if (rows.size() > numbersOfRecordsForTransaction) {
375 334
				counterTotal += rows.size();
376 335
				importTransaction(jdbcTemplate, transactionTemplate, rows);
......
398 357
			final TransactionTemplate transactionTemplate,
399 358
			final List<GenericRow> rows) throws DatabaseException {
400 359

  
401
		final AtomicReference<DatabaseException> error = new AtomicReference<DatabaseException>();
360
		final AtomicReference<DatabaseException> error = new AtomicReference<>();
402 361

  
403 362
		try {
404
			return transactionTemplate.execute(new TransactionCallback<List<GenericRow>>() {
405

  
406
				@Override
407
				public List<GenericRow> doInTransaction(final TransactionStatus status) {
408
					final List<GenericRow> ok = Lists.newArrayList();
409
					try {
410
						for (GenericRow row : rows) {
411
							if (row.isToDelete()) {
412
								deleteRow(jdbcTemplate, row.getTable(), row.getFields());
413
							} else {
414
								addOrUpdateRow(jdbcTemplate, row.getTable(), row.getFields());
415
							}
416
							ok.add(row);
363
			return transactionTemplate.execute(status -> {
364
				final List<GenericRow> ok = Lists.newArrayList();
365
				try {
366
					for (GenericRow row : rows) {
367
						if (row.isToDelete()) {
368
							deleteRow(jdbcTemplate, row);
369
						} else {
370
							addOrUpdateRow(jdbcTemplate, row);
417 371
						}
418
					} catch (DatabaseException e) {
419
						log.warn("Transaction failed", e);
420
						status.setRollbackOnly();
421
						error.set(e);
372
						ok.add(row);
422 373
					}
423
					return ok;
374
				} catch (DatabaseException e) {
375
					log.warn("Transaction failed", e);
376
					status.setRollbackOnly();
377
					error.set(e);
424 378
				}
379
				return ok;
425 380
			});
426 381
		} finally {
427 382
			if (error.get() != null) {
......
430 385
		}
431 386
	}
432 387

  
433
	protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
388
	protected void addOrUpdateRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
434 389
		try {
435 390

  
436 391
			if (log.isDebugEnabled()) {
437
				log.debug("Adding or updating element to table " + table);
392
				log.debug("Adding or updating element to table " + row.getTable());
438 393
			}
439
			verifyParameters(table);
440
			verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
394
			verifyParameters(row.getTable());
395
			verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
441 396

  
397
			final List<Map<String, Object>> res =
398
					jdbcTemplate.queryForList("SELECT 1 FROM datasources d WHERE d.id = ? and d.managed is true", row.getDatasourceId());
399

  
400
			if (res != null && !res.isEmpty()) {
401
				log.debug(String.format("avoiding to update managed datasource %s", row.getDatasourceId()));
402
				return;
403
			}
404

  
442 405
			String fields = "";
443 406
			String values = "";
444
			List<Object> list = new ArrayList<Object>();
407
			List<Object> list = new ArrayList<>();
445 408

  
446
			for (Map.Entry<String, Object> e : rowFields.entrySet()) {
409
			for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
447 410
				if (!fields.isEmpty()) {
448 411
					fields += ",";
449 412
				}
......
456 419
			}
457 420

  
458 421
			int count = 0;
459
			if (rowFields.containsKey(DNET_RESOURCE_ID_FIELD)) {
460
				List<Object> list2 = new ArrayList<Object>();
422
			if (row.getFields().containsKey(DNET_RESOURCE_ID_FIELD)) {
423
				List<Object> list2 = new ArrayList<>();
461 424
				list2.addAll(list);
462
				list2.add(rowFields.get(DNET_RESOURCE_ID_FIELD));
463
				count =
464
						jdbcTemplate.update("UPDATE " + table + " SET (" + fields + ") = (" + values + ") WHERE " + DNET_RESOURCE_ID_FIELD + "=?",
465
								list2.toArray());
425
				list2.add(row.getFields().get(DNET_RESOURCE_ID_FIELD));
426

  
427
				count = jdbcTemplate.update(String.format("UPDATE %s SET (%s) = (%s) WHERE %s = ?", row.getTable(), fields, values, DNET_RESOURCE_ID_FIELD), list2.toArray());
466 428
			}
467 429
			if (count == 0) {
468
				jdbcTemplate.update("INSERT INTO " + table + " (" + fields + ") VALUES (" + values + ")", list.toArray());
430
				jdbcTemplate.update(String.format("INSERT INTO %s (%s) VALUES (%s)", row.getTable(), fields, values), list.toArray());
469 431
			}
470 432
		} catch (final Exception e) {
471 433
			throw new DatabaseException("Error adding or updating record", e);
472 434
		}
473 435
	}
474 436

  
475
	protected void deleteRow(final JdbcTemplate jdbcTemplate, final String table, final Map<String, Object> rowFields) throws DatabaseException {
437
	protected void deleteRow(final JdbcTemplate jdbcTemplate, final GenericRow row) throws DatabaseException {
476 438
		if (log.isDebugEnabled()) {
477
			log.debug("Deleting element from table " + table);
439
			log.debug("Deleting element from table " + row.getTable());
478 440
		}
479
		verifyParameters(table);
480
		verifyParameters(rowFields.keySet().toArray(new String[rowFields.size()]));
441
		verifyParameters(row.getTable());
442
		verifyParameters(row.getFields().keySet().toArray(new String[row.getFields().size()]));
481 443

  
482
		List<Object> list = new ArrayList<Object>();
444
		List<Object> list = new ArrayList<>();
483 445

  
484 446
		String where = "";
485 447

  
486
		for (Map.Entry<String, Object> e : rowFields.entrySet()) {
448
		for (Map.Entry<String, Object> e : row.getFields().entrySet()) {
487 449
			if (!where.isEmpty()) {
488 450
				where += " AND ";
489 451
			}
......
492 454
		}
493 455

  
494 456
		if (where.isEmpty()) throw new DatabaseException("Delete condition is empty");
495
		int n = jdbcTemplate.update("DELETE FROM " + table + " WHERE " + where, list.toArray());
457
		int n = jdbcTemplate.update("DELETE FROM " + row.getTable() + " WHERE " + where, list.toArray());
496 458

  
497 459
		if (log.isDebugEnabled()) {
498 460
			log.debug("Number of Deleted records: " + n);
......
543 505
				table, DNET_RESOURCE_ID_FIELD) == 1;
544 506
	}
545 507

  
546
	public boolean isLoggedTable(final String database, final String table) throws DatabaseException {
547
		verifyParameters(database, table);
548
		JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
549
		return isLoggedTable(jdbcTemplate, table);
550
	}
551

  
552
	private boolean isLoggedTable(final JdbcTemplate jdbcTemplate, final String table) {
553
		return jdbcTemplate.queryForObject("SELECT count(*) FROM information_schema.tables WHERE table_name = ?", Integer.class,
554
				table + "_log") == 1;
555
	}
556

  
557 508
	public String getDefaultDnetIdentifier(final String database, final String table) throws DatabaseException {
558 509
		verifyParameters(database, table);
559 510
		JdbcTemplate jdbcTemplate = jdbcTemplateFactory.createJdbcTemplate(database);
......
590 541

  
591 542
	public String getSQLFromTemplate(final String sqlTemplate, final String db, final String table, Map<String, Object> map) {
592 543
		if (map == null) {
593
			map = new HashMap<String, Object>();
544
			map = new HashMap<>();
594 545
		}
595 546

  
596 547
		map.put("mainDB", defaultDB);
......
603 554

  
604 555
	public List<GenericRow> obtainListOfRows(final String xml) throws DatabaseException {
605 556
		try {
606
			Document doc = new SAXReader().read(new StringReader(xml));
557
			final Document doc = new SAXReader().read(new StringReader(xml));
607 558

  
608
			List<GenericRow> list = new ArrayList<GenericRow>();
559
			final String datasourceId = doc.valueOf("//ROWS/ROW[@table='datasources']/FIELD[@name='id']/text()");
560
			if (StringUtils.isBlank(datasourceId)) {
561
				throw new DatabaseException("datasource Id is missing, check your mappings!");
562
			}
609 563

  
564
			final List<GenericRow> list = Lists.newArrayList();
565

  
610 566
			for (Object or : doc.selectNodes("//ROW")) {
611 567
				Element row = (Element) or;
612 568

  
......
655 611
					}
656 612
				}
657 613

  
658
				list.add(new GenericRow(table, fields, toDelete));
614
				list.add(new GenericRow(datasourceId, table, fields, toDelete));
659 615
			}
660 616
			return list;
661 617
		} catch (Exception e) {
......
754 710
		this.defaultDB = defaultDB;
755 711
	}
756 712

  
757
	public class TableDates {
758

  
759
		private Date lastInsert;
760
		private Date lastUpdate;
761
		private Date lastDelete;
762

  
763
		public Date getLastInsert() {
764
			return lastInsert;
765
		}
766

  
767
		public void setLastInsert(final Date lastInsert) {
768
			this.lastInsert = lastInsert;
769
		}
770

  
771
		public Date getLastUpdate() {
772
			return lastUpdate;
773
		}
774

  
775
		public void setLastUpdate(final Date lastUpdate) {
776
			this.lastUpdate = lastUpdate;
777
		}
778

  
779
		public Date getLastDelete() {
780
			return lastDelete;
781
		}
782

  
783
		public void setLastDelete(final Date lastDelete) {
784
			this.lastDelete = lastDelete;
785
		}
786
	}
787

  
788 713
}

Also available in: Unified diff