1
|
package eu.dnetlib.validator.service.impls.listeners;
|
2
|
|
3
|
import java.util.ArrayList;
|
4
|
import java.util.HashMap;
|
5
|
import java.util.List;
|
6
|
import java.util.Map;
|
7
|
import java.util.Set;
|
8
|
|
9
|
import javax.xml.xpath.XPath;
|
10
|
import javax.xml.xpath.XPathConstants;
|
11
|
import javax.xml.xpath.XPathExpressionException;
|
12
|
import javax.xml.xpath.XPathFactory;
|
13
|
|
14
|
import org.apache.log4j.Logger;
|
15
|
import org.springframework.transaction.annotation.Propagation;
|
16
|
import org.springframework.transaction.annotation.Transactional;
|
17
|
import org.w3c.dom.Node;
|
18
|
import org.w3c.dom.NodeList;
|
19
|
|
20
|
import eu.dnetlib.domain.functionality.validator.Rule;
|
21
|
import eu.dnetlib.validator.commons.dao.DaoException;
|
22
|
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
|
23
|
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
|
24
|
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
|
25
|
import eu.dnetlib.validator.commons.dao.tasks.TaskStored;
|
26
|
import eu.dnetlib.validator.commons.dao.tasks.TasksDAO;
|
27
|
import eu.dnetlib.validator.engine.ValidatorException;
|
28
|
import eu.dnetlib.validator.engine.execution.CompletedTask;
|
29
|
import eu.dnetlib.validator.engine.execution.JobListener;
|
30
|
|
31
|
/**
|
32
|
*
|
33
|
* @author Nikon Gasparis
|
34
|
*
|
35
|
*/
|
36
|
public class ValidatorJobListener implements JobListener {
|
37
|
private static Logger logger = Logger.getLogger(ValidatorJobListener.class);
|
38
|
|
39
|
private Integer jobSubmittedId;
|
40
|
private String jobSubmittedUser;
|
41
|
private TasksDAO tasksDao;
|
42
|
private JobsDAO jobsDao;
|
43
|
private RulesDAO rulesDao;
|
44
|
private String validationType;
|
45
|
private Set<Integer> blacklistRuleIds;
|
46
|
|
47
|
private int objsValidated = 0;
|
48
|
private int score = 0;
|
49
|
private int internalJobsFinished = 0;
|
50
|
private int internalJobsSum = 1;
|
51
|
|
52
|
Map<String,Map<Integer,RuleStatus>> scoreMapPerGroupBy = new HashMap<String,Map<Integer,RuleStatus>>();
|
53
|
private String valBaseUrl = null;
|
54
|
|
55
|
|
56
|
|
57
|
private Map<Integer, Rule> ruleCacheMap = new HashMap<Integer, Rule>();
|
58
|
private String groupBy_xpath = null;
|
59
|
|
60
|
@Override
|
61
|
@Transactional(propagation = Propagation.REQUIRED)
|
62
|
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext, Throwable t) throws ValidatorException {
|
63
|
try {
|
64
|
logger.error("Completed tasks for " + jobId + " (exception). Error: " + t.getMessage());
|
65
|
List<TaskStored> tasksStored = new ArrayList<TaskStored>();
|
66
|
List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
|
67
|
for (CompletedTask ctask : tasks) {
|
68
|
TaskStored taskStored = new TaskStored();
|
69
|
taskStored.setEnded(ctask.finished.toString());
|
70
|
taskStored.setStarted(ctask.started.toString());
|
71
|
taskStored.setJobId(jobSubmittedId);
|
72
|
taskStored.setRecordIdentifier(ctask.valobjId);
|
73
|
taskStored.setRuleId(ctask.ruleId);
|
74
|
taskStored.setSuccess(ctask.success);
|
75
|
taskStored.setStatus("finished");
|
76
|
tasksStored.add(taskStored);
|
77
|
|
78
|
logger.debug("JOBID:"+jobSubmittedId+"# Task-failed: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
|
79
|
" ruleId: "+taskStored.getRuleId() + " error:"+ t.getMessage());
|
80
|
}
|
81
|
this.updateScoreMap(groupBy_values, tasksStored);
|
82
|
if (objsValidated % 100 == 0)
|
83
|
jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
|
84
|
objsValidated++;
|
85
|
} catch (Exception e) {
|
86
|
logger.error("Error while proccessing results");
|
87
|
throw new ValidatorException("Error while proccessing results", e);
|
88
|
}
|
89
|
}
|
90
|
|
91
|
@Override
|
92
|
@Transactional(propagation = Propagation.REQUIRED)
|
93
|
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext) throws ValidatorException {
|
94
|
try {
|
95
|
List<Map<String, String>> veloList = new ArrayList<Map<String, String>>();
|
96
|
|
97
|
logger.debug("JOBID:"+jobSubmittedId+"# Updating Completed Tasks");
|
98
|
Map<Integer,Boolean> ruleIdsSuccess = new HashMap<Integer,Boolean>();
|
99
|
Map<String, String> veloMap = null;
|
100
|
List<TaskStored> tasksStored = new ArrayList<TaskStored>();
|
101
|
List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
|
102
|
|
103
|
int recordScore = -1;
|
104
|
for (CompletedTask ctask : tasks) {
|
105
|
veloMap = new HashMap<String, String>();
|
106
|
TaskStored taskStored = new TaskStored();
|
107
|
taskStored.setEnded(ctask.finished.toString());
|
108
|
taskStored.setStarted(ctask.started.toString());
|
109
|
taskStored.setJobId(jobSubmittedId);
|
110
|
taskStored.setRecordIdentifier(ctask.valobjId);
|
111
|
taskStored.setRuleId(ctask.ruleId);
|
112
|
taskStored.setSuccess(ctask.success);
|
113
|
taskStored.setStatus("finished");
|
114
|
tasksStored.add(taskStored);
|
115
|
|
116
|
// logger.debug("JOBID:"+jobSubmittedId+"# Task: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
|
117
|
// " ruleId: "+taskStored.getRuleId());
|
118
|
ruleIdsSuccess.put(taskStored.getRuleId(),ctask.success);
|
119
|
|
120
|
// logger.debug("inserted on map: "+ "id: " + taskStored.getRuleId());
|
121
|
if (!ctask.success) {
|
122
|
veloMap.put("id", Integer.toString(taskStored.getRuleId()));
|
123
|
veloMap.put("error", "...");
|
124
|
veloList.add(veloMap);
|
125
|
// if (scoreTriggerRules.contains(ctask.ruleId)) {
|
126
|
// recordScore=0;
|
127
|
// }
|
128
|
Rule rule = this.getRule(taskStored.getRuleId());
|
129
|
String xpath = null;
|
130
|
veloMap.put("name", rule.getName());
|
131
|
if (! rule.getType().contains("Chain"))
|
132
|
xpath = rule.getConfiguration().getProperty("xpath");
|
133
|
else {
|
134
|
xpath = this.getRule(Integer.parseInt(rule.getConfiguration().getProperty("rule_2"))).getConfiguration().getProperty("xpath");
|
135
|
}
|
136
|
veloMap.put("xpath", xpath);
|
137
|
veloMap.put("mandatory", Boolean.toString(rule.isMandatory()));
|
138
|
}
|
139
|
|
140
|
}
|
141
|
this.updateScoreMap(groupBy_values, tasksStored);
|
142
|
|
143
|
// WORKFLOWS BEGIN
|
144
|
int recordBlacklistScore = -1;
|
145
|
if (recordScore == -1) {
|
146
|
recordScore = this.calculateRecordScore(ruleIdsSuccess);
|
147
|
if (blacklistRuleIds != null )
|
148
|
recordBlacklistScore = this.calculateBlacklistRecordScore(ruleIdsSuccess, blacklistRuleIds);
|
149
|
}
|
150
|
|
151
|
Map<String,String> recordValidationResult = new HashMap<String,String>();
|
152
|
recordValidationResult.put("score", Integer.toString(score));
|
153
|
recordValidationResult.put("id", tasks.get(0).valobjId);
|
154
|
if (score > 0)
|
155
|
recordValidationResult.put("status", "pass");
|
156
|
else
|
157
|
recordValidationResult.put("status", "fail");
|
158
|
|
159
|
recordContext.put("veloList", (List<Map<String, String>>) veloList);
|
160
|
recordContext.put("recordValidationResult", (Map<String,String>) recordValidationResult);
|
161
|
recordContext.put("score", (int) score);
|
162
|
recordContext.put("recordBlacklistScore", (int) recordBlacklistScore);
|
163
|
|
164
|
// WORKFLOWS END
|
165
|
if (objsValidated % 100 == 0)
|
166
|
jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
|
167
|
objsValidated++;
|
168
|
} catch (Exception e) {
|
169
|
logger.error("Error while proccessing results");
|
170
|
throw new ValidatorException("Error while proccessing results", e);
|
171
|
}
|
172
|
}
|
173
|
|
174
|
@Override
|
175
|
@Transactional(propagation = Propagation.REQUIRED)
|
176
|
public synchronized void finished(int jobId, Map<String,Object> jobContext) {
|
177
|
try {
|
178
|
internalJobsFinished++;
|
179
|
if(internalJobsFinished == internalJobsSum) {
|
180
|
logger.debug("JOBID:"+ this.jobSubmittedId +"# Set job finished");
|
181
|
jobContext.put("jobSubmittedId", (int) jobSubmittedId);
|
182
|
jobContext.put("jobSubmittedUser", (String) jobSubmittedUser);
|
183
|
|
184
|
score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy, null, false, objsValidated, validationType);
|
185
|
logger.debug("score_"+validationType +": " + score);
|
186
|
jobContext.put("score_"+validationType, (int) score);
|
187
|
tasksDao.saveTasks(this.scoreMapPerGroupBy.get("all"));
|
188
|
if (jobSubmittedUser.contains("Workflow")) {
|
189
|
jobsDao.setTotalJobFinished(jobId, null, false);
|
190
|
}
|
191
|
} else {
|
192
|
logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
|
193
|
}
|
194
|
} catch (Exception e) {
|
195
|
logger.error("Error while finalizing successfull job");
|
196
|
}
|
197
|
}
|
198
|
|
199
|
@Override
|
200
|
@Transactional(propagation = Propagation.REQUIRED)
|
201
|
public synchronized void failed(int jobId, Map<String,Object> jobContext, Throwable t) {
|
202
|
try {
|
203
|
internalJobsFinished++;
|
204
|
if(internalJobsFinished == internalJobsSum) {
|
205
|
logger.debug("JOBID:"+jobSubmittedId+"# Set job finished-failed");
|
206
|
jobContext.put("jobSubmittedId", (int) jobSubmittedId);
|
207
|
jobContext.put("jobSubmittedUser",(String) jobSubmittedUser);
|
208
|
score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy,t.getMessage(), true, objsValidated, validationType);
|
209
|
jobContext.put("score_"+validationType, (int) 0);
|
210
|
|
211
|
if (jobSubmittedUser.contains("Workflow")) {
|
212
|
jobsDao.setTotalJobFinished(jobId, t.getMessage(), true);
|
213
|
}
|
214
|
} else {
|
215
|
logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
|
216
|
}
|
217
|
} catch (Exception e) {
|
218
|
logger.error("Error while finalizing failed job");
|
219
|
}
|
220
|
}
|
221
|
|
222
|
private synchronized void updateScoreMap(List<String> groupBy_values, List<TaskStored> tasksStored) throws Exception {
|
223
|
for (String groupBy : groupBy_values) {
|
224
|
logger.debug("updating map for groupBy value: " + groupBy);
|
225
|
Map<Integer,RuleStatus> scoreMapPerRule = null;
|
226
|
if((scoreMapPerRule=this.scoreMapPerGroupBy.get(groupBy)) == null) {
|
227
|
logger.debug("map for groupBy value: " + groupBy + " doesn't exist");
|
228
|
scoreMapPerRule = new HashMap<Integer, RuleStatus>();
|
229
|
} else {
|
230
|
logger.debug("map for groupBy value: " + groupBy + " exists");
|
231
|
}
|
232
|
for (TaskStored task : tasksStored) {
|
233
|
RuleStatus ruleSt = null;
|
234
|
if((ruleSt=scoreMapPerRule.get(task.getRuleId())) == null) {
|
235
|
logger.debug("ruleStatus for rule with id : " + task.getRuleId() + " doesn't exist");
|
236
|
ruleSt = new RuleStatus();
|
237
|
}
|
238
|
Rule rule = null;
|
239
|
if((rule=ruleCacheMap.get(task.getRuleId())) == null) {
|
240
|
rule = rulesDao.get(task.getRuleId());
|
241
|
ruleCacheMap.put(task.getRuleId(), rule);
|
242
|
}
|
243
|
ruleSt.setMandatory(rule.isMandatory());
|
244
|
ruleSt.setWeight(rule.getWeight());
|
245
|
ruleSt.total++;
|
246
|
if (task.getSuccess())
|
247
|
ruleSt.success++;
|
248
|
else {
|
249
|
if (groupBy.equals("all") && ruleSt.getFailedTasks().size() < 10 )
|
250
|
ruleSt.getFailedTasks().add(task);
|
251
|
}
|
252
|
scoreMapPerRule.put(task.getRuleId(), ruleSt);
|
253
|
}
|
254
|
this.scoreMapPerGroupBy.put(groupBy, scoreMapPerRule);
|
255
|
}
|
256
|
}
|
257
|
|
258
|
private int calculateRecordScore(Map<Integer,Boolean> ruleIdsSuccess) throws Exception {
|
259
|
float score = 0;
|
260
|
float sum = 0;
|
261
|
float weights = 0 ;
|
262
|
for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
|
263
|
Rule rule = ruleCacheMap.get(entry.getKey());
|
264
|
if (rule.isMandatory()) {
|
265
|
weights += rule.getWeight();
|
266
|
if (entry.getValue()) {
|
267
|
sum += rule.getWeight();
|
268
|
}
|
269
|
}
|
270
|
score = (sum/weights)*100;
|
271
|
|
272
|
}
|
273
|
return (int) score;
|
274
|
}
|
275
|
|
276
|
private int calculateBlacklistRecordScore(Map<Integer,Boolean> ruleIdsSuccess, Set<Integer> blacklistRuleIds) throws Exception {
|
277
|
float score = 0;
|
278
|
float sum = 0;
|
279
|
float weights = 0 ;
|
280
|
for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
|
281
|
Rule rule = ruleCacheMap.get(entry.getKey());
|
282
|
if (rule.isMandatory() && blacklistRuleIds.contains(entry.getKey())) {
|
283
|
weights += rule.getWeight();
|
284
|
if (entry.getValue()) {
|
285
|
sum += rule.getWeight();
|
286
|
}
|
287
|
}
|
288
|
score = (sum/weights)*100;
|
289
|
}
|
290
|
return (int) score;
|
291
|
}
|
292
|
|
293
|
private synchronized List<String> parseGroupByValues(Object record, String xpath) {
|
294
|
List<String> groupBy_values = null;
|
295
|
logger.debug("groupBy field: "+xpath);
|
296
|
try{
|
297
|
XPathFactory factory = XPathFactory.newInstance();
|
298
|
XPath xPath = factory.newXPath();
|
299
|
NodeList rtl = (NodeList) xPath.evaluate(xpath+"/text()",record ,XPathConstants.NODESET);
|
300
|
groupBy_values = new ArrayList<String>();
|
301
|
if (rtl.getLength() > 0) {
|
302
|
for(int i=0; i<rtl.getLength(); i++) {
|
303
|
Node childNode = rtl.item(i);
|
304
|
groupBy_values.add(childNode.getNodeValue());
|
305
|
logger.debug("value: "+childNode.getNodeValue());
|
306
|
}
|
307
|
groupBy_values.add("all");
|
308
|
} else {
|
309
|
groupBy_values.add("all");
|
310
|
}
|
311
|
|
312
|
} catch (XPathExpressionException e) {
|
313
|
logger.error("error getting object"+ e);
|
314
|
}
|
315
|
return groupBy_values;
|
316
|
}
|
317
|
|
318
|
private Rule getRule(int id) throws DaoException {
|
319
|
Rule rule = null;
|
320
|
if((rule=ruleCacheMap.get(id)) == null) {
|
321
|
rule = rulesDao.get(id);
|
322
|
ruleCacheMap.put(id, rule);
|
323
|
}
|
324
|
return rule;
|
325
|
|
326
|
}
|
327
|
public Integer getJobSubmittedId() {
|
328
|
return jobSubmittedId;
|
329
|
}
|
330
|
|
331
|
public void setJobSubmittedId(Integer jobSubmittedId) {
|
332
|
this.jobSubmittedId = jobSubmittedId;
|
333
|
}
|
334
|
|
335
|
public String getJobSubmittedUser() {
|
336
|
return jobSubmittedUser;
|
337
|
}
|
338
|
|
339
|
public void setJobSubmittedUser(String jobSubmittedUser) {
|
340
|
this.jobSubmittedUser = jobSubmittedUser;
|
341
|
}
|
342
|
|
343
|
public String getGroupBy_xpath() {
|
344
|
return groupBy_xpath;
|
345
|
}
|
346
|
|
347
|
public void setGroupBy_xpath(String groupBy_xpath) {
|
348
|
this.groupBy_xpath = groupBy_xpath;
|
349
|
}
|
350
|
|
351
|
public String getValBaseUrl() {
|
352
|
return valBaseUrl;
|
353
|
}
|
354
|
|
355
|
public void setValBaseUrl(String valBaseUrl) {
|
356
|
this.valBaseUrl = valBaseUrl;
|
357
|
}
|
358
|
|
359
|
public int getInternalJobsSum() {
|
360
|
return internalJobsSum;
|
361
|
}
|
362
|
|
363
|
public void setInternalJobsSum(int internalJobsSum) {
|
364
|
this.internalJobsSum = internalJobsSum;
|
365
|
}
|
366
|
|
367
|
|
368
|
public Map<String, Map<Integer, RuleStatus>> getScoreMapPerGroupBy() {
|
369
|
return scoreMapPerGroupBy;
|
370
|
}
|
371
|
|
372
|
public void setScoreMapPerGroupBy(
|
373
|
Map<String, Map<Integer, RuleStatus>> scoreMapPerGroupBy) {
|
374
|
this.scoreMapPerGroupBy = scoreMapPerGroupBy;
|
375
|
}
|
376
|
|
377
|
public TasksDAO getTasksDao() {
|
378
|
return tasksDao;
|
379
|
}
|
380
|
|
381
|
public void setTasksDao(TasksDAO tasksDao) {
|
382
|
this.tasksDao = tasksDao;
|
383
|
}
|
384
|
|
385
|
public JobsDAO getJobsDao() {
|
386
|
return jobsDao;
|
387
|
}
|
388
|
|
389
|
public void setJobsDao(JobsDAO jobsDao) {
|
390
|
this.jobsDao = jobsDao;
|
391
|
}
|
392
|
|
393
|
public RulesDAO getRulesDao() {
|
394
|
return rulesDao;
|
395
|
}
|
396
|
|
397
|
public void setRulesDao(RulesDAO rulesDao) {
|
398
|
this.rulesDao = rulesDao;
|
399
|
}
|
400
|
|
401
|
public Map<Integer, Rule> getRuleCacheMap() {
|
402
|
return ruleCacheMap;
|
403
|
}
|
404
|
|
405
|
public void setRuleCacheMap(Map<Integer, Rule> ruleCacheMap) {
|
406
|
this.ruleCacheMap = ruleCacheMap;
|
407
|
}
|
408
|
|
409
|
public String getValidationType() {
|
410
|
return validationType;
|
411
|
}
|
412
|
|
413
|
public void setValidationType(String validationType) {
|
414
|
this.validationType = validationType;
|
415
|
}
|
416
|
|
417
|
public Set<Integer> getBlacklistRuleIds() {
|
418
|
return blacklistRuleIds;
|
419
|
}
|
420
|
|
421
|
public void setBlacklistRuleIds(Set<Integer> blacklistRuleIds) {
|
422
|
this.blacklistRuleIds = blacklistRuleIds;
|
423
|
}
|
424
|
|
425
|
|
426
|
|
427
|
|
428
|
}
|
429
|
|