Project

General

Profile

1
package eu.dnetlib.validator.commons.dao.tasks;
2

    
3
import eu.dnetlib.validator.commons.dao.AbstractDAO;
4
import eu.dnetlib.validator.commons.dao.DaoException;
5
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
6

    
7
import java.sql.Connection;
8
import java.sql.PreparedStatement;
9
import java.sql.ResultSet;
10
import java.sql.SQLException;
11
import java.util.ArrayList;
12
import java.util.List;
13
import java.util.Map;
14
import java.util.Map.Entry;
15

    
16
public class TasksDAOimpl extends AbstractDAO<TaskStored> implements TasksDAO{
17

    
18
	@Override
19
	protected PreparedStatement getUpdateStatement(TaskStored t, Connection con)
20
			throws SQLException {
21
		String query="UPDATE tasks set status=?, success=?, started=?, ended=?, record_url=? WHERE job_id=? AND rule_id=? AND record_identifier=?";
22
		PreparedStatement stmt = con.prepareStatement(query);
23
//		logger.debug("getting taskStored updateStatement");
24
		stmt.setString(1, t.getStatus());
25
		stmt.setBoolean(2, t.getSuccess());
26
		stmt.setString(3, t.getStarted());
27
		stmt.setString(4, t.getEnded());
28
		stmt.setString(5, t.getRecordUrl());	
29
		stmt.setInt(6, t.getJobId());
30
		stmt.setInt(7, t.getRuleId());
31
		stmt.setString(8, t.getRecordIdentifier());
32
		return stmt;
33
	}
34

    
35
	@Override
36
	protected PreparedStatement getInsertStatement(TaskStored t, Connection con) throws SQLException {
37
		String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";
38
		PreparedStatement stmt = con.prepareStatement(query);
39
//		logger.debug("getting taskStored insertStatement");
40
		stmt.setString(1, t.getStatus());
41
		stmt.setBoolean(2, t.getSuccess());
42
		stmt.setString(3, t.getStarted());
43
		stmt.setString(4, t.getEnded());
44
		stmt.setInt(5, t.getJobId());
45
		stmt.setInt(6, t.getRuleId());
46
		stmt.setString(7, t.getRecordIdentifier());
47
		stmt.setString(8, t.getRecordUrl());		
48
	
49
		return stmt;
50
	}
51

    
52
	@Override
53
	protected PreparedStatement getDeleteStatement(int id, Connection con)
54
			throws SQLException {
55
		String query="DELETE FROM tasks WHERE job_id=?";
56
		PreparedStatement stmt = con.prepareStatement(query);
57
		stmt.setInt(1, id);
58
		return stmt;
59
	}
60

    
61

    
62
	@Override
63
	public List<TaskStored> getTasksOfJob(int id) throws DaoException {
64
		ResultSet rs = null;
65
		Connection con = null;
66
		PreparedStatement stmt = null;
67
		TaskStored retTask = null;
68
		List<TaskStored> retList = null; 
69
		logger.debug("Accessing DB to get all Tasks of Job");
70
		try {
71
			con = getConnection();
72
			String query="SELECT status, success, started, ended, rule_id, record_identifier, record_url FROM tasks WHERE job_id=?";
73
			stmt = con.prepareStatement(query);
74
			stmt.setInt(1, id);
75
			rs = stmt.executeQuery();
76
			if (rs!=null){
77
				retList = new ArrayList<TaskStored>();
78
				
79
				while (rs.next()) {
80
					retTask = new TaskStored();
81
					retTask.setStatus(rs.getString(1));
82
					retTask.setSuccess(rs.getBoolean(2));
83
					retTask.setStarted(rs.getString(3));
84
					retTask.setEnded(rs.getString(4));
85
					retTask.setRuleId(rs.getInt(5));
86
					retTask.setRecordIdentifier(rs.getString(6));
87
					retTask.setRecordUrl(rs.getString(7));
88
					retTask.setJobId(id);
89
					retList.add(retTask);
90
				}				
91
			}
92
		} catch (Exception e) {
93
			logger.error("Accessing DB to get all Tasks of Job.", e);
94
			throw new DaoException(e);
95
		} finally {
96
			if (stmt != null) {
97
				try {
98
					stmt.close();
99
				} catch (SQLException e) {
100
					logger.error("Accessing DB to get all Tasks of Job.", e);
101
					throw new DaoException(e);
102
				}
103
			}
104
		}
105
		return retList;
106

    
107
	}
108
	
109
	@Override
110
	public TaskStored get(int id) {
111
		// TODO Auto-generated method stub
112
		return null;
113
	}
114

    
115
	@Override
116
	public List<String> getValidationErrors(int jobId, int ruleId) throws DaoException {
117
		ResultSet rs = null;
118
		Connection con = null;
119
		PreparedStatement stmt = null;
120
		List<String> retList = null; 
121
		logger.debug("Accessing DB to get Validation Errors of JobId " + jobId + " and RuleId " + ruleId);
122
		try {
123
			con = getConnection();
124
			String query="SELECT record_identifier FROM tasks WHERE job_id=? AND rule_id=? AND success=? LIMIT 30";
125
			stmt = con.prepareStatement(query);
126
			stmt.setInt(1, jobId);
127
			stmt.setInt(2, ruleId);
128
			stmt.setBoolean(3, false);
129
			rs = stmt.executeQuery();
130
			if (rs!=null){
131
				retList = new ArrayList<String>();
132
				
133
				while (rs.next()) {
134
//					if (!rs.getBoolean(1))
135
					retList.add(rs.getString(1));
136
				}				
137
			}
138
		} catch (Exception e) {
139
			logger.error("Accessing DB to get Validation Errors of a JobId and RuleId.", e);
140
			throw new DaoException(e);
141
		} finally {
142
			if (stmt != null) {
143
				try {
144
					stmt.close();
145
				} catch (SQLException e) {
146
					logger.error("Accessing DB to get Validation Errors of a JobId and RuleId.", e);
147
					throw new DaoException(e);
148
				}
149
			}
150
		}
151
		return retList;
152

    
153

    
154
	}
155

    
156
	@Override
157
	public List<String> getDistinctTasksOfJob(int jobId) throws DaoException {
158
		List<String> retList = null;
159
		ResultSet rs = null;
160
		Connection con = null;
161
		PreparedStatement stmt = null;
162
		logger.debug("Accessing DB to get Distinct Rule ids of Tasks");
163
		try {
164
			con = getConnection();
165
			String query="SELECT distinct rule_id from tasks where job_id=?";
166
			stmt = con.prepareStatement(query);
167
			stmt.setInt(1, jobId);
168
			rs = stmt.executeQuery();
169
			if (rs!=null){
170
				retList = new ArrayList<String>();
171
				
172
				while (rs.next()) {
173
					retList.add(Integer.toString(rs.getInt(1)));
174
				}				
175
			}
176
		} catch (Exception e) {
177
			logger.error("Accessing DB to get Distinct Rule ids of Tasks.", e);
178
			throw new DaoException(e);
179
		} finally {
180
			if (stmt != null) {
181
				try {
182
					stmt.close();
183
				} catch (SQLException e) {
184
					logger.error("Accessing DB to get Distinct Rule ids of Tasks.", e);
185
					throw new DaoException(e);
186
				}
187
			}
188
		}
189
		return retList;
190

    
191
		
192
	}
193

    
194
	@Override
195
	public List<TaskStored> getFinishedTasks(int jobId, int ruleId) throws DaoException {
196
		ResultSet rs = null;
197
		Connection con = null;
198
		PreparedStatement stmt = null;
199
		TaskStored retTask = null;
200
		List<TaskStored> retList = null; 
201
		logger.debug("Accessing DB to get Finished Tasks");
202
		try {
203
			con = getConnection();
204
			String query="SELECT success, record_identifier FROM tasks WHERE job_id=? AND rule_id=? AND status=?";
205
			stmt = con.prepareStatement(query);
206
			stmt.setInt(1, jobId);
207
			stmt.setInt(2, ruleId);
208
			stmt.setString(3, "finished");
209
			rs = stmt.executeQuery();
210
			if (rs!=null){
211
				retList = new ArrayList<TaskStored>();
212
				
213
				while (rs.next()) {
214
					retTask = new TaskStored();
215
					retTask.setSuccess(rs.getBoolean(1));
216
					retTask.setRecordIdentifier(rs.getString(2));
217
					retList.add(retTask);
218
				}				
219
			}
220
		} catch (Exception e) {
221
			logger.error("Accessing DB to get Finished Tasks.", e);
222
			throw new DaoException(e);
223
		} finally {
224
			if (stmt != null) {
225
				try {
226
					stmt.close();
227
				} catch (SQLException e) {
228
					logger.error("Accessing DB to get Finished Tasks.", e);
229
					throw new DaoException(e);
230
				}
231
			}
232
		}
233
		return retList;
234

    
235

    
236
	}
237

    
238
	@Override
239
	public void saveTasksBatch(List<TaskStored> tasks, Map<String,List<String>> groupByMap) throws DaoException {
240
		Connection con = null;
241
		PreparedStatement stmt = null, stmt1 = null;
242
		logger.debug("Accessing DB to save batch of tasks");
243
		try {
244
			
245
			con = getConnection();
246
			String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";stmt = con.prepareStatement(query);
247
			stmt = con.prepareStatement(query);
248
						
249
			for (TaskStored t : tasks ) {
250
				stmt.setString(1, t.getStatus());
251
				stmt.setBoolean(2, t.getSuccess());
252
				stmt.setString(3, t.getStarted());
253
				stmt.setString(4, t.getEnded());
254
				stmt.setInt(5, t.getJobId());
255
				stmt.setInt(6, t.getRuleId());
256
				stmt.setString(7, t.getRecordIdentifier());
257
				stmt.setString(8, t.getRecordUrl());	
258
				stmt.addBatch();
259
		    }
260
		    stmt.executeBatch();
261
		    logger.debug("Tasks inserted: "+tasks.size());
262

    
263
		    if (!groupByMap.isEmpty()) {
264
		    	logger.debug("Inserting record's groupBy values..");
265
		    	for (Map.Entry<String, List<String>> entry : groupByMap.entrySet()) {
266
			    	query="INSERT INTO record_groupby(record_id, groupby, job_id) VALUES(?,?,?)";
267
			    	stmt1 = con.prepareStatement(query);
268
			    	for (String value :entry.getValue()) {
269
			    		stmt1.setString(1, entry.getKey());
270
			    		stmt1.setString(2, value);
271
			    		stmt1.setInt(3, tasks.get(0).getJobId());		
272
			    		stmt1.addBatch();
273
			    	}
274
		    	}
275
		    	stmt1.executeBatch();
276
//		    	logger.debug("groupBy values inserted: "+groupBy_values.size());		    	
277
		    }
278
		    
279
		} catch (Exception e) {
280
			logger.error("Error Accessing DB to save batch of tasks.", e);
281
			throw new DaoException(e);
282
		} finally {
283
			if (stmt != null) {
284
				try {
285
					stmt.close();
286
				} catch (SQLException e) {
287
					logger.error("Accessing DB to save batch of tasks.", e);
288
					throw new DaoException(e);
289
				}
290
			}
291
			if (stmt1 != null) {
292
				try {
293
					stmt1.close();
294
				} catch (SQLException e) {
295
					logger.error("Accessing DB to save batch of tasks.", e);
296
					throw new DaoException(e);
297
				}
298
			}			
299
		}
300
		
301
	}
302

    
303
	@Override
304
	public void saveTasks(List<TaskStored> tasks, List<String> groupBy_values) throws DaoException {
305
		Connection con = null;
306
		PreparedStatement stmt = null, stmt1 = null;
307
		logger.debug("Accessing DB to save batch of tasks");
308
		try {
309
			
310
			con = getConnection();
311
			String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";stmt = con.prepareStatement(query);
312
			stmt = con.prepareStatement(query);
313
						
314
			for (TaskStored t : tasks ) {
315
				stmt.setString(1, t.getStatus());
316
				stmt.setBoolean(2, t.getSuccess());
317
				stmt.setString(3, t.getStarted());
318
				stmt.setString(4, t.getEnded());
319
				stmt.setInt(5, t.getJobId());
320
				stmt.setInt(6, t.getRuleId());
321
				stmt.setString(7, t.getRecordIdentifier());
322
				stmt.setString(8, t.getRecordUrl());	
323
				stmt.addBatch();
324
		    }
325
		    stmt.executeBatch();
326
		    logger.debug("Tasks inserted: "+tasks.size());
327

    
328
		    if (groupBy_values != null) {
329
		    	logger.debug("Inserting record's groupBy values..");
330
		    	query="INSERT INTO record_groupby(record_id, groupby, job_id) VALUES(?,?,?)";
331
		    	stmt1 = con.prepareStatement(query);
332
		    	for (String value : groupBy_values) {
333
		    		stmt1.setString(1, tasks.get(0).getRecordIdentifier());
334
		    		stmt1.setString(2, value);
335
		    		stmt1.setInt(3, tasks.get(0).getJobId());		
336
		    		stmt1.addBatch();
337
		    	}
338
		    	stmt1.executeBatch();
339
		    	logger.debug("groupBy values inserted: "+groupBy_values.size());		    	
340
		    }
341
		    
342
		} catch (Exception e) {
343
			logger.error("Error Accessing DB to save batch of tasks.", e);
344
			throw new DaoException(e);
345
		} finally {
346
			if (stmt != null) {
347
				try {
348
					stmt.close();
349
				} catch (SQLException e) {
350
					logger.error("Accessing DB to save batch of tasks.", e);
351
					throw new DaoException(e);
352
				}
353
			}
354
			if (stmt1 != null) {
355
				try {
356
					stmt1.close();
357
				} catch (SQLException e) {
358
					logger.error("Accessing DB to save batch of tasks.", e);
359
				}
360
			}			
361
		}
362
		
363
	}
364
	
365
	@Override
366
	public void saveTasks(Map<Integer, RuleStatus> scoreMapPerRule) throws DaoException{
367
		Connection con = null;
368
		PreparedStatement stmt = null;
369
		logger.debug("Accessing DB to save batch of failed tasks");
370
		try {
371
			
372
			con = getConnection();
373
			String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";
374
			stmt = con.prepareStatement(query);
375
			
376
			for (Entry<Integer, RuleStatus> entry : scoreMapPerRule.entrySet()) {
377
				for (TaskStored t : entry.getValue().getFailedTasks() ) {
378
					stmt.setString(1, t.getStatus());
379
					stmt.setBoolean(2, t.getSuccess());
380
					stmt.setString(3, t.getStarted());
381
					stmt.setString(4, t.getEnded());
382
					stmt.setInt(5, t.getJobId());
383
					stmt.setInt(6, t.getRuleId());
384
					stmt.setString(7, t.getRecordIdentifier());
385
					stmt.setString(8, t.getRecordUrl());	
386
					stmt.addBatch();
387
				}
388
			}
389
		    int result = stmt.executeBatch().length;
390
		    logger.debug("Tasks inserted: "+ result);
391

    
392
		} catch (Exception e) {
393
			logger.error("Error Accessing DB to save batch of failed tasks", e);
394
			throw new DaoException(e);
395
		} finally {
396
			if (stmt != null) {
397
				try {
398
					stmt.close();
399
				} catch (SQLException e) {
400
					logger.error("Accessing DB to save batch of failed tasks.", e);
401
					throw new DaoException(e);
402
				}
403
			}
404
		}
405
		
406
	}
407

    
408
	
409
	@Override
410
	protected int getLastId() throws DaoException {
411
		return 1;
412
//		ResultSet rs = null;
413
//		Connection con = null;
414
//		PreparedStatement stmt = null;
415
//		int retId = -1;
416
//		logger.debug("Accessing DB to get Task's next available id");
417
//		try {
418
//			con = getConnection();
419
//			String query="SELECT currval(pg_get_serial_sequence(?,?)) FROM tasks";
420
//			stmt = con.prepareStatement(query);
421
//			stmt.setString(1, "tasks");
422
//			stmt.setString(2, "rule_id");
423
//			
424
//			rs = stmt.executeQuery();
425
//			if (rs!=null){
426
//				rs.next();
427
//				retId=rs.getInt(1);
428
//			}
429
//
430
//
431
//		} catch (SQLException e) {
432
//			logger.error("Error while accessing DB to get Task's next available id.", e);
433
//		} finally {
434
//			if (stmt != null) {
435
//				try {
436
//					stmt.close();
437
//				} catch (SQLException e) {
438
//					logger.error("Error while accessing DB to get Task's next available id.", e);
439
//				}
440
//			}
441
//		}
442
//		return retId;
443
	}
444

    
445
	@Override
446
	public void cleanTasks(int jobId) throws DaoException{
447
		Connection con = null;
448
		PreparedStatement stmt = null;
449
		logger.debug("Accessing DB to delete unneeded tasks");
450
		try {
451
			con = getConnection();
452
			String query="DELETE FROM tasks WHERE job_id = ? AND success = ?";
453
			stmt = con.prepareStatement(query);
454
			stmt.setInt(1, jobId);
455
			stmt.setBoolean(2, true);
456
			stmt.executeUpdate();
457
			stmt.close();
458
		} catch (Exception e) {
459
			logger.error("Error while Accessing DB to delete unneeded tasks.", e);
460
			throw new DaoException(e);
461
		} finally {
462
			if (stmt != null) {
463
				try {
464
					stmt.close();
465
				} catch (SQLException e) {
466
					logger.error("Error while Accessing DB to delete unneeded tasks.", e);
467
					throw new DaoException(e);
468
				}
469
			}
470
		}
471

    
472
	}
473

    
474
}
(3-3/3)