1
|
package eu.dnetlib.validator.commons.dao.tasks;
|
2
|
|
3
|
import eu.dnetlib.validator.commons.dao.AbstractDAO;
|
4
|
import eu.dnetlib.validator.commons.dao.DaoException;
|
5
|
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
|
6
|
|
7
|
import java.sql.Connection;
|
8
|
import java.sql.PreparedStatement;
|
9
|
import java.sql.ResultSet;
|
10
|
import java.sql.SQLException;
|
11
|
import java.util.ArrayList;
|
12
|
import java.util.List;
|
13
|
import java.util.Map;
|
14
|
import java.util.Map.Entry;
|
15
|
|
16
|
public class TasksDAOimpl extends AbstractDAO<TaskStored> implements TasksDAO{
|
17
|
|
18
|
@Override
|
19
|
protected PreparedStatement getUpdateStatement(TaskStored t, Connection con)
|
20
|
throws SQLException {
|
21
|
String query="UPDATE tasks set status=?, success=?, started=?, ended=?, record_url=? WHERE job_id=? AND rule_id=? AND record_identifier=?";
|
22
|
PreparedStatement stmt = con.prepareStatement(query);
|
23
|
// logger.debug("getting taskStored updateStatement");
|
24
|
stmt.setString(1, t.getStatus());
|
25
|
stmt.setBoolean(2, t.getSuccess());
|
26
|
stmt.setString(3, t.getStarted());
|
27
|
stmt.setString(4, t.getEnded());
|
28
|
stmt.setString(5, t.getRecordUrl());
|
29
|
stmt.setInt(6, t.getJobId());
|
30
|
stmt.setInt(7, t.getRuleId());
|
31
|
stmt.setString(8, t.getRecordIdentifier());
|
32
|
return stmt;
|
33
|
}
|
34
|
|
35
|
@Override
|
36
|
protected PreparedStatement getInsertStatement(TaskStored t, Connection con) throws SQLException {
|
37
|
String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";
|
38
|
PreparedStatement stmt = con.prepareStatement(query);
|
39
|
// logger.debug("getting taskStored insertStatement");
|
40
|
stmt.setString(1, t.getStatus());
|
41
|
stmt.setBoolean(2, t.getSuccess());
|
42
|
stmt.setString(3, t.getStarted());
|
43
|
stmt.setString(4, t.getEnded());
|
44
|
stmt.setInt(5, t.getJobId());
|
45
|
stmt.setInt(6, t.getRuleId());
|
46
|
stmt.setString(7, t.getRecordIdentifier());
|
47
|
stmt.setString(8, t.getRecordUrl());
|
48
|
|
49
|
return stmt;
|
50
|
}
|
51
|
|
52
|
@Override
|
53
|
protected PreparedStatement getDeleteStatement(int id, Connection con)
|
54
|
throws SQLException {
|
55
|
String query="DELETE FROM tasks WHERE job_id=?";
|
56
|
PreparedStatement stmt = con.prepareStatement(query);
|
57
|
stmt.setInt(1, id);
|
58
|
return stmt;
|
59
|
}
|
60
|
|
61
|
|
62
|
@Override
|
63
|
public List<TaskStored> getTasksOfJob(int id) throws DaoException {
|
64
|
ResultSet rs = null;
|
65
|
Connection con = null;
|
66
|
PreparedStatement stmt = null;
|
67
|
TaskStored retTask = null;
|
68
|
List<TaskStored> retList = null;
|
69
|
logger.debug("Accessing DB to get all Tasks of Job");
|
70
|
try {
|
71
|
con = getConnection();
|
72
|
String query="SELECT status, success, started, ended, rule_id, record_identifier, record_url FROM tasks WHERE job_id=?";
|
73
|
stmt = con.prepareStatement(query);
|
74
|
stmt.setInt(1, id);
|
75
|
rs = stmt.executeQuery();
|
76
|
if (rs!=null){
|
77
|
retList = new ArrayList<TaskStored>();
|
78
|
|
79
|
while (rs.next()) {
|
80
|
retTask = new TaskStored();
|
81
|
retTask.setStatus(rs.getString(1));
|
82
|
retTask.setSuccess(rs.getBoolean(2));
|
83
|
retTask.setStarted(rs.getString(3));
|
84
|
retTask.setEnded(rs.getString(4));
|
85
|
retTask.setRuleId(rs.getInt(5));
|
86
|
retTask.setRecordIdentifier(rs.getString(6));
|
87
|
retTask.setRecordUrl(rs.getString(7));
|
88
|
retTask.setJobId(id);
|
89
|
retList.add(retTask);
|
90
|
}
|
91
|
}
|
92
|
} catch (Exception e) {
|
93
|
logger.error("Accessing DB to get all Tasks of Job.", e);
|
94
|
throw new DaoException(e);
|
95
|
} finally {
|
96
|
if (stmt != null) {
|
97
|
try {
|
98
|
stmt.close();
|
99
|
} catch (SQLException e) {
|
100
|
logger.error("Accessing DB to get all Tasks of Job.", e);
|
101
|
throw new DaoException(e);
|
102
|
}
|
103
|
}
|
104
|
}
|
105
|
return retList;
|
106
|
|
107
|
}
|
108
|
|
109
|
@Override
|
110
|
public TaskStored get(int id) {
|
111
|
// TODO Auto-generated method stub
|
112
|
return null;
|
113
|
}
|
114
|
|
115
|
@Override
|
116
|
public List<String> getValidationErrors(int jobId, int ruleId) throws DaoException {
|
117
|
ResultSet rs = null;
|
118
|
Connection con = null;
|
119
|
PreparedStatement stmt = null;
|
120
|
List<String> retList = null;
|
121
|
logger.debug("Accessing DB to get Validation Errors of JobId " + jobId + " and RuleId " + ruleId);
|
122
|
try {
|
123
|
con = getConnection();
|
124
|
String query="SELECT record_identifier FROM tasks WHERE job_id=? AND rule_id=? AND success=? LIMIT 30";
|
125
|
stmt = con.prepareStatement(query);
|
126
|
stmt.setInt(1, jobId);
|
127
|
stmt.setInt(2, ruleId);
|
128
|
stmt.setBoolean(3, false);
|
129
|
rs = stmt.executeQuery();
|
130
|
if (rs!=null){
|
131
|
retList = new ArrayList<String>();
|
132
|
|
133
|
while (rs.next()) {
|
134
|
// if (!rs.getBoolean(1))
|
135
|
retList.add(rs.getString(1));
|
136
|
}
|
137
|
}
|
138
|
} catch (Exception e) {
|
139
|
logger.error("Accessing DB to get Validation Errors of a JobId and RuleId.", e);
|
140
|
throw new DaoException(e);
|
141
|
} finally {
|
142
|
if (stmt != null) {
|
143
|
try {
|
144
|
stmt.close();
|
145
|
} catch (SQLException e) {
|
146
|
logger.error("Accessing DB to get Validation Errors of a JobId and RuleId.", e);
|
147
|
throw new DaoException(e);
|
148
|
}
|
149
|
}
|
150
|
}
|
151
|
return retList;
|
152
|
|
153
|
|
154
|
}
|
155
|
|
156
|
@Override
|
157
|
public List<String> getDistinctTasksOfJob(int jobId) throws DaoException {
|
158
|
List<String> retList = null;
|
159
|
ResultSet rs = null;
|
160
|
Connection con = null;
|
161
|
PreparedStatement stmt = null;
|
162
|
logger.debug("Accessing DB to get Distinct Rule ids of Tasks");
|
163
|
try {
|
164
|
con = getConnection();
|
165
|
String query="SELECT distinct rule_id from tasks where job_id=?";
|
166
|
stmt = con.prepareStatement(query);
|
167
|
stmt.setInt(1, jobId);
|
168
|
rs = stmt.executeQuery();
|
169
|
if (rs!=null){
|
170
|
retList = new ArrayList<String>();
|
171
|
|
172
|
while (rs.next()) {
|
173
|
retList.add(Integer.toString(rs.getInt(1)));
|
174
|
}
|
175
|
}
|
176
|
} catch (Exception e) {
|
177
|
logger.error("Accessing DB to get Distinct Rule ids of Tasks.", e);
|
178
|
throw new DaoException(e);
|
179
|
} finally {
|
180
|
if (stmt != null) {
|
181
|
try {
|
182
|
stmt.close();
|
183
|
} catch (SQLException e) {
|
184
|
logger.error("Accessing DB to get Distinct Rule ids of Tasks.", e);
|
185
|
throw new DaoException(e);
|
186
|
}
|
187
|
}
|
188
|
}
|
189
|
return retList;
|
190
|
|
191
|
|
192
|
}
|
193
|
|
194
|
@Override
|
195
|
public List<TaskStored> getFinishedTasks(int jobId, int ruleId) throws DaoException {
|
196
|
ResultSet rs = null;
|
197
|
Connection con = null;
|
198
|
PreparedStatement stmt = null;
|
199
|
TaskStored retTask = null;
|
200
|
List<TaskStored> retList = null;
|
201
|
logger.debug("Accessing DB to get Finished Tasks");
|
202
|
try {
|
203
|
con = getConnection();
|
204
|
String query="SELECT success, record_identifier FROM tasks WHERE job_id=? AND rule_id=? AND status=?";
|
205
|
stmt = con.prepareStatement(query);
|
206
|
stmt.setInt(1, jobId);
|
207
|
stmt.setInt(2, ruleId);
|
208
|
stmt.setString(3, "finished");
|
209
|
rs = stmt.executeQuery();
|
210
|
if (rs!=null){
|
211
|
retList = new ArrayList<TaskStored>();
|
212
|
|
213
|
while (rs.next()) {
|
214
|
retTask = new TaskStored();
|
215
|
retTask.setSuccess(rs.getBoolean(1));
|
216
|
retTask.setRecordIdentifier(rs.getString(2));
|
217
|
retList.add(retTask);
|
218
|
}
|
219
|
}
|
220
|
} catch (Exception e) {
|
221
|
logger.error("Accessing DB to get Finished Tasks.", e);
|
222
|
throw new DaoException(e);
|
223
|
} finally {
|
224
|
if (stmt != null) {
|
225
|
try {
|
226
|
stmt.close();
|
227
|
} catch (SQLException e) {
|
228
|
logger.error("Accessing DB to get Finished Tasks.", e);
|
229
|
throw new DaoException(e);
|
230
|
}
|
231
|
}
|
232
|
}
|
233
|
return retList;
|
234
|
|
235
|
|
236
|
}
|
237
|
|
238
|
@Override
|
239
|
public void saveTasksBatch(List<TaskStored> tasks, Map<String,List<String>> groupByMap) throws DaoException {
|
240
|
Connection con = null;
|
241
|
PreparedStatement stmt = null, stmt1 = null;
|
242
|
logger.debug("Accessing DB to save batch of tasks");
|
243
|
try {
|
244
|
|
245
|
con = getConnection();
|
246
|
String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";stmt = con.prepareStatement(query);
|
247
|
stmt = con.prepareStatement(query);
|
248
|
|
249
|
for (TaskStored t : tasks ) {
|
250
|
stmt.setString(1, t.getStatus());
|
251
|
stmt.setBoolean(2, t.getSuccess());
|
252
|
stmt.setString(3, t.getStarted());
|
253
|
stmt.setString(4, t.getEnded());
|
254
|
stmt.setInt(5, t.getJobId());
|
255
|
stmt.setInt(6, t.getRuleId());
|
256
|
stmt.setString(7, t.getRecordIdentifier());
|
257
|
stmt.setString(8, t.getRecordUrl());
|
258
|
stmt.addBatch();
|
259
|
}
|
260
|
stmt.executeBatch();
|
261
|
logger.debug("Tasks inserted: "+tasks.size());
|
262
|
|
263
|
if (!groupByMap.isEmpty()) {
|
264
|
logger.debug("Inserting record's groupBy values..");
|
265
|
for (Map.Entry<String, List<String>> entry : groupByMap.entrySet()) {
|
266
|
query="INSERT INTO record_groupby(record_id, groupby, job_id) VALUES(?,?,?)";
|
267
|
stmt1 = con.prepareStatement(query);
|
268
|
for (String value :entry.getValue()) {
|
269
|
stmt1.setString(1, entry.getKey());
|
270
|
stmt1.setString(2, value);
|
271
|
stmt1.setInt(3, tasks.get(0).getJobId());
|
272
|
stmt1.addBatch();
|
273
|
}
|
274
|
}
|
275
|
stmt1.executeBatch();
|
276
|
// logger.debug("groupBy values inserted: "+groupBy_values.size());
|
277
|
}
|
278
|
|
279
|
} catch (Exception e) {
|
280
|
logger.error("Error Accessing DB to save batch of tasks.", e);
|
281
|
throw new DaoException(e);
|
282
|
} finally {
|
283
|
if (stmt != null) {
|
284
|
try {
|
285
|
stmt.close();
|
286
|
} catch (SQLException e) {
|
287
|
logger.error("Accessing DB to save batch of tasks.", e);
|
288
|
throw new DaoException(e);
|
289
|
}
|
290
|
}
|
291
|
if (stmt1 != null) {
|
292
|
try {
|
293
|
stmt1.close();
|
294
|
} catch (SQLException e) {
|
295
|
logger.error("Accessing DB to save batch of tasks.", e);
|
296
|
throw new DaoException(e);
|
297
|
}
|
298
|
}
|
299
|
}
|
300
|
|
301
|
}
|
302
|
|
303
|
@Override
|
304
|
public void saveTasks(List<TaskStored> tasks, List<String> groupBy_values) throws DaoException {
|
305
|
Connection con = null;
|
306
|
PreparedStatement stmt = null, stmt1 = null;
|
307
|
logger.debug("Accessing DB to save batch of tasks");
|
308
|
try {
|
309
|
|
310
|
con = getConnection();
|
311
|
String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";stmt = con.prepareStatement(query);
|
312
|
stmt = con.prepareStatement(query);
|
313
|
|
314
|
for (TaskStored t : tasks ) {
|
315
|
stmt.setString(1, t.getStatus());
|
316
|
stmt.setBoolean(2, t.getSuccess());
|
317
|
stmt.setString(3, t.getStarted());
|
318
|
stmt.setString(4, t.getEnded());
|
319
|
stmt.setInt(5, t.getJobId());
|
320
|
stmt.setInt(6, t.getRuleId());
|
321
|
stmt.setString(7, t.getRecordIdentifier());
|
322
|
stmt.setString(8, t.getRecordUrl());
|
323
|
stmt.addBatch();
|
324
|
}
|
325
|
stmt.executeBatch();
|
326
|
logger.debug("Tasks inserted: "+tasks.size());
|
327
|
|
328
|
if (groupBy_values != null) {
|
329
|
logger.debug("Inserting record's groupBy values..");
|
330
|
query="INSERT INTO record_groupby(record_id, groupby, job_id) VALUES(?,?,?)";
|
331
|
stmt1 = con.prepareStatement(query);
|
332
|
for (String value : groupBy_values) {
|
333
|
stmt1.setString(1, tasks.get(0).getRecordIdentifier());
|
334
|
stmt1.setString(2, value);
|
335
|
stmt1.setInt(3, tasks.get(0).getJobId());
|
336
|
stmt1.addBatch();
|
337
|
}
|
338
|
stmt1.executeBatch();
|
339
|
logger.debug("groupBy values inserted: "+groupBy_values.size());
|
340
|
}
|
341
|
|
342
|
} catch (Exception e) {
|
343
|
logger.error("Error Accessing DB to save batch of tasks.", e);
|
344
|
throw new DaoException(e);
|
345
|
} finally {
|
346
|
if (stmt != null) {
|
347
|
try {
|
348
|
stmt.close();
|
349
|
} catch (SQLException e) {
|
350
|
logger.error("Accessing DB to save batch of tasks.", e);
|
351
|
throw new DaoException(e);
|
352
|
}
|
353
|
}
|
354
|
if (stmt1 != null) {
|
355
|
try {
|
356
|
stmt1.close();
|
357
|
} catch (SQLException e) {
|
358
|
logger.error("Accessing DB to save batch of tasks.", e);
|
359
|
}
|
360
|
}
|
361
|
}
|
362
|
|
363
|
}
|
364
|
|
365
|
@Override
|
366
|
public void saveTasks(Map<Integer, RuleStatus> scoreMapPerRule) throws DaoException{
|
367
|
Connection con = null;
|
368
|
PreparedStatement stmt = null;
|
369
|
logger.debug("Accessing DB to save batch of failed tasks");
|
370
|
try {
|
371
|
|
372
|
con = getConnection();
|
373
|
String query="INSERT INTO tasks(status, success, started, ended, job_id, rule_id, record_identifier, record_url) VALUES(?,?,?,?,?,?,?,?)";
|
374
|
stmt = con.prepareStatement(query);
|
375
|
|
376
|
for (Entry<Integer, RuleStatus> entry : scoreMapPerRule.entrySet()) {
|
377
|
for (TaskStored t : entry.getValue().getFailedTasks() ) {
|
378
|
stmt.setString(1, t.getStatus());
|
379
|
stmt.setBoolean(2, t.getSuccess());
|
380
|
stmt.setString(3, t.getStarted());
|
381
|
stmt.setString(4, t.getEnded());
|
382
|
stmt.setInt(5, t.getJobId());
|
383
|
stmt.setInt(6, t.getRuleId());
|
384
|
stmt.setString(7, t.getRecordIdentifier());
|
385
|
stmt.setString(8, t.getRecordUrl());
|
386
|
stmt.addBatch();
|
387
|
}
|
388
|
}
|
389
|
int result = stmt.executeBatch().length;
|
390
|
logger.debug("Tasks inserted: "+ result);
|
391
|
|
392
|
} catch (Exception e) {
|
393
|
logger.error("Error Accessing DB to save batch of failed tasks", e);
|
394
|
throw new DaoException(e);
|
395
|
} finally {
|
396
|
if (stmt != null) {
|
397
|
try {
|
398
|
stmt.close();
|
399
|
} catch (SQLException e) {
|
400
|
logger.error("Accessing DB to save batch of failed tasks.", e);
|
401
|
throw new DaoException(e);
|
402
|
}
|
403
|
}
|
404
|
}
|
405
|
|
406
|
}
|
407
|
|
408
|
|
409
|
@Override
|
410
|
protected int getLastId() throws DaoException {
|
411
|
return 1;
|
412
|
// ResultSet rs = null;
|
413
|
// Connection con = null;
|
414
|
// PreparedStatement stmt = null;
|
415
|
// int retId = -1;
|
416
|
// logger.debug("Accessing DB to get Task's next available id");
|
417
|
// try {
|
418
|
// con = getConnection();
|
419
|
// String query="SELECT currval(pg_get_serial_sequence(?,?)) FROM tasks";
|
420
|
// stmt = con.prepareStatement(query);
|
421
|
// stmt.setString(1, "tasks");
|
422
|
// stmt.setString(2, "rule_id");
|
423
|
//
|
424
|
// rs = stmt.executeQuery();
|
425
|
// if (rs!=null){
|
426
|
// rs.next();
|
427
|
// retId=rs.getInt(1);
|
428
|
// }
|
429
|
//
|
430
|
//
|
431
|
// } catch (SQLException e) {
|
432
|
// logger.error("Error while accessing DB to get Task's next available id.", e);
|
433
|
// } finally {
|
434
|
// if (stmt != null) {
|
435
|
// try {
|
436
|
// stmt.close();
|
437
|
// } catch (SQLException e) {
|
438
|
// logger.error("Error while accessing DB to get Task's next available id.", e);
|
439
|
// }
|
440
|
// }
|
441
|
// }
|
442
|
// return retId;
|
443
|
}
|
444
|
|
445
|
@Override
|
446
|
public void cleanTasks(int jobId) throws DaoException{
|
447
|
Connection con = null;
|
448
|
PreparedStatement stmt = null;
|
449
|
logger.debug("Accessing DB to delete unneeded tasks");
|
450
|
try {
|
451
|
con = getConnection();
|
452
|
String query="DELETE FROM tasks WHERE job_id = ? AND success = ?";
|
453
|
stmt = con.prepareStatement(query);
|
454
|
stmt.setInt(1, jobId);
|
455
|
stmt.setBoolean(2, true);
|
456
|
stmt.executeUpdate();
|
457
|
stmt.close();
|
458
|
} catch (Exception e) {
|
459
|
logger.error("Error while Accessing DB to delete unneeded tasks.", e);
|
460
|
throw new DaoException(e);
|
461
|
} finally {
|
462
|
if (stmt != null) {
|
463
|
try {
|
464
|
stmt.close();
|
465
|
} catch (SQLException e) {
|
466
|
logger.error("Error while Accessing DB to delete unneeded tasks.", e);
|
467
|
throw new DaoException(e);
|
468
|
}
|
469
|
}
|
470
|
}
|
471
|
|
472
|
}
|
473
|
|
474
|
}
|