1
|
package eu.dnetlib.validator.service.impls.listeners;
|
2
|
|
3
|
import java.util.ArrayList;
|
4
|
import java.util.HashMap;
|
5
|
import java.util.List;
|
6
|
import java.util.Map;
|
7
|
|
8
|
import javax.xml.xpath.XPath;
|
9
|
import javax.xml.xpath.XPathConstants;
|
10
|
import javax.xml.xpath.XPathExpressionException;
|
11
|
import javax.xml.xpath.XPathFactory;
|
12
|
|
13
|
import org.apache.log4j.Logger;
|
14
|
import org.springframework.transaction.annotation.Propagation;
|
15
|
import org.springframework.transaction.annotation.Transactional;
|
16
|
import org.w3c.dom.Node;
|
17
|
import org.w3c.dom.NodeList;
|
18
|
|
19
|
import eu.dnetlib.domain.functionality.validator.Rule;
|
20
|
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
|
21
|
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
|
22
|
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
|
23
|
import eu.dnetlib.validator.commons.dao.tasks.TaskStored;
|
24
|
import eu.dnetlib.validator.commons.dao.tasks.TasksDAO;
|
25
|
import eu.dnetlib.validator.engine.execution.CompletedTask;
|
26
|
import eu.dnetlib.validator.engine.execution.JobListener;
|
27
|
|
28
|
/**
|
29
|
*
|
30
|
* @author Nikon Gasparis
|
31
|
*
|
32
|
*/
|
33
|
public class ValidatorJobListener implements JobListener {
|
34
|
private static Logger logger = Logger.getLogger(ValidatorJobListener.class);
|
35
|
|
36
|
private Integer jobSubmittedId;
|
37
|
private String jobSubmittedUser;
|
38
|
private TasksDAO tasksDao;
|
39
|
private JobsDAO jobsDao;
|
40
|
private RulesDAO rulesDao;
|
41
|
private String validationType;
|
42
|
|
43
|
private int objsValidated = 0;
|
44
|
private int score = 0;
|
45
|
private int internalJobsFinished = 0;
|
46
|
|
47
|
Map<String,Map<Integer,RuleStatus>> scoreMapPerGroupBy = new HashMap<String,Map<Integer,RuleStatus>>();
|
48
|
private String valBaseUrl = null;
|
49
|
|
50
|
private int internalJobsSum = 1;
|
51
|
|
52
|
|
53
|
private Map<Integer, Rule> ruleCacheMap = new HashMap<Integer, Rule>();
|
54
|
private String groupBy_xpath = null;
|
55
|
|
56
|
@Override
|
57
|
@Transactional(propagation = Propagation.REQUIRED)
|
58
|
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext, Throwable t) {
|
59
|
logger.error("Completed tasks for " + jobId + " (exception). Error: " + t.getMessage());
|
60
|
List<TaskStored> tasksStored = new ArrayList<TaskStored>();
|
61
|
List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
|
62
|
for (CompletedTask ctask : tasks) {
|
63
|
TaskStored taskStored = new TaskStored();
|
64
|
taskStored.setEnded(ctask.finished.toString());
|
65
|
taskStored.setStarted(ctask.started.toString());
|
66
|
taskStored.setJobId(jobSubmittedId);
|
67
|
taskStored.setRecordIdentifier(ctask.valobjId);
|
68
|
taskStored.setRuleId(ctask.ruleId);
|
69
|
taskStored.setSuccess(ctask.success);
|
70
|
taskStored.setStatus("finished");
|
71
|
tasksStored.add(taskStored);
|
72
|
|
73
|
logger.debug("JOBID:"+jobSubmittedId+"# Task-failed: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
|
74
|
" ruleId: "+taskStored.getRuleId() + " error:"+ t.getMessage());
|
75
|
}
|
76
|
this.updateScoreMap(groupBy_values, tasksStored);
|
77
|
// taskStoredDao.saveTasks(tasksStored,groupBy_values);
|
78
|
if (objsValidated % 100 == 0)
|
79
|
jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
|
80
|
objsValidated++;
|
81
|
}
|
82
|
|
83
|
@Override
|
84
|
@Transactional(propagation = Propagation.REQUIRED)
|
85
|
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext) {
|
86
|
|
87
|
List<Map<String, String>> veloList = new ArrayList<Map<String, String>>();
|
88
|
|
89
|
logger.debug("JOBID:"+jobSubmittedId+"# Updating Completed Tasks");
|
90
|
Map<Integer,Boolean> ruleIdsSuccess = new HashMap<Integer,Boolean>();
|
91
|
Map<String, String> veloMap = null;
|
92
|
List<TaskStored> tasksStored = new ArrayList<TaskStored>();
|
93
|
List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
|
94
|
|
95
|
for (CompletedTask ctask : tasks) {
|
96
|
veloMap = new HashMap<String, String>();
|
97
|
TaskStored taskStored = new TaskStored();
|
98
|
taskStored.setEnded(ctask.finished.toString());
|
99
|
taskStored.setStarted(ctask.started.toString());
|
100
|
taskStored.setJobId(jobSubmittedId);
|
101
|
taskStored.setRecordIdentifier(ctask.valobjId);
|
102
|
taskStored.setRuleId(ctask.ruleId);
|
103
|
taskStored.setSuccess(ctask.success);
|
104
|
taskStored.setStatus("finished");
|
105
|
tasksStored.add(taskStored);
|
106
|
|
107
|
// logger.debug("JOBID:"+jobSubmittedId+"# Task: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
|
108
|
// " ruleId: "+taskStored.getRuleId());
|
109
|
ruleIdsSuccess.put(taskStored.getRuleId(),ctask.success);
|
110
|
|
111
|
// logger.debug("inserted on map: "+ "id: " + taskStored.getRuleId());
|
112
|
if (!ctask.success) {
|
113
|
veloMap.put("id", Integer.toString(taskStored.getRuleId()));
|
114
|
veloMap.put("error", "...");
|
115
|
veloList.add(veloMap);
|
116
|
}
|
117
|
|
118
|
}
|
119
|
this.updateScoreMap(groupBy_values, tasksStored);
|
120
|
|
121
|
// WORKFLOWS BEGIN
|
122
|
int score = this.calculateRecordScore(ruleIdsSuccess);
|
123
|
Map<String,String> recordValidationResult = new HashMap<String,String>();
|
124
|
recordValidationResult.put("score", Integer.toString(score));
|
125
|
recordValidationResult.put("id", tasks.get(0).valobjId);
|
126
|
if (score > 0)
|
127
|
recordValidationResult.put("status", "pass");
|
128
|
else
|
129
|
recordValidationResult.put("status", "fail");
|
130
|
|
131
|
recordContext.put("veloList", (List<Map<String, String>>) veloList);
|
132
|
recordContext.put("recordValidationResult", (Map<String,String>) recordValidationResult);
|
133
|
recordContext.put("score", (int) score);
|
134
|
|
135
|
|
136
|
// WORKFLOWS END
|
137
|
if (objsValidated % 10 == 0)
|
138
|
jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
|
139
|
objsValidated++;
|
140
|
}
|
141
|
|
142
|
@Override
|
143
|
@Transactional(propagation = Propagation.REQUIRED)
|
144
|
public synchronized void finished(int jobId, Map<String,Object> jobContext) {
|
145
|
internalJobsFinished++;
|
146
|
if(internalJobsFinished == internalJobsSum) {
|
147
|
logger.debug("JOBID:"+ this.jobSubmittedId +"# Set job finished");
|
148
|
tasksDao.saveTasks(this.scoreMapPerGroupBy.get("all"));
|
149
|
jobContext.put("jobSubmittedId", (int) jobSubmittedId);
|
150
|
jobContext.put("jobSubmittedUser", (String) jobSubmittedUser);
|
151
|
|
152
|
score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy, null, false, objsValidated, validationType);
|
153
|
logger.debug("score_"+validationType +": " + score);
|
154
|
jobContext.put("score_"+validationType, (int) score);
|
155
|
if (jobSubmittedUser.contains("Workflow")) {
|
156
|
jobsDao.setTotalJobFinished(jobId, null, false);
|
157
|
}
|
158
|
} else {
|
159
|
logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
|
160
|
}
|
161
|
}
|
162
|
|
163
|
@Override
|
164
|
@Transactional(propagation = Propagation.REQUIRED)
|
165
|
public synchronized void failed(int jobId, Map<String,Object> jobContext, Throwable t) {
|
166
|
|
167
|
internalJobsFinished++;
|
168
|
if(internalJobsFinished == internalJobsSum) {
|
169
|
logger.debug("JOBID:"+jobSubmittedId+"# Set job finished-failed");
|
170
|
jobContext.put("jobSubmittedId", (int) jobSubmittedId);
|
171
|
jobContext.put("jobSubmittedUser",(String) jobSubmittedUser);
|
172
|
score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy,t.getMessage(), true, objsValidated, validationType);
|
173
|
jobContext.put("score_"+validationType, (int) 0);
|
174
|
|
175
|
if (jobSubmittedUser.contains("Workflow")) {
|
176
|
jobsDao.setTotalJobFinished(jobId, t.getMessage(), true);
|
177
|
}
|
178
|
} else {
|
179
|
logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
|
180
|
}
|
181
|
}
|
182
|
|
183
|
private synchronized void updateScoreMap(List<String> groupBy_values, List<TaskStored> tasksStored) {
|
184
|
for (String groupBy : groupBy_values) {
|
185
|
logger.debug("updating map for groupBy value: " + groupBy);
|
186
|
Map<Integer,RuleStatus> scoreMapPerRule = null;
|
187
|
if((scoreMapPerRule=this.scoreMapPerGroupBy.get(groupBy)) == null) {
|
188
|
logger.debug("map for groupBy value: " + groupBy + " doesn't exist");
|
189
|
scoreMapPerRule = new HashMap<Integer, RuleStatus>();
|
190
|
} else {
|
191
|
logger.debug("map for groupBy value: " + groupBy + " exists");
|
192
|
}
|
193
|
for (TaskStored task : tasksStored) {
|
194
|
RuleStatus ruleSt = null;
|
195
|
if((ruleSt=scoreMapPerRule.get(task.getRuleId())) == null) {
|
196
|
logger.debug("ruleStatus for rule with id : " + task.getRuleId() + " doesn't exist");
|
197
|
ruleSt = new RuleStatus();
|
198
|
}
|
199
|
Rule rule = null;
|
200
|
if((rule=ruleCacheMap.get(task.getRuleId())) == null) {
|
201
|
rule = rulesDao.get(task.getRuleId());
|
202
|
ruleCacheMap.put(task.getRuleId(), rule);
|
203
|
}
|
204
|
ruleSt.setMandatory(rule.isMandatory());
|
205
|
ruleSt.setWeight(rule.getWeight());
|
206
|
ruleSt.total++;
|
207
|
if (task.getSuccess())
|
208
|
ruleSt.success++;
|
209
|
else {
|
210
|
if (groupBy.equals("all") && ruleSt.getFailedTasks().size() < 10 )
|
211
|
ruleSt.getFailedTasks().add(task);
|
212
|
}
|
213
|
scoreMapPerRule.put(task.getRuleId(), ruleSt);
|
214
|
}
|
215
|
this.scoreMapPerGroupBy.put(groupBy, scoreMapPerRule);
|
216
|
}
|
217
|
}
|
218
|
|
219
|
private int calculateRecordScore(Map<Integer,Boolean> ruleIdsSuccess) {
|
220
|
float score = 0;
|
221
|
float sum = 0;
|
222
|
float weights = 0 ;
|
223
|
for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
|
224
|
// RuleStored rule = ruleStoredDao.get(entry.getKey().toString());
|
225
|
Rule rule = ruleCacheMap.get(entry.getKey());
|
226
|
if (rule.isMandatory()) {
|
227
|
weights += rule.getWeight();
|
228
|
if (entry.getValue()) {
|
229
|
sum += rule.getWeight();
|
230
|
}
|
231
|
}
|
232
|
score = (sum/weights)*100;
|
233
|
|
234
|
}
|
235
|
return (int) score;
|
236
|
}
|
237
|
|
238
|
private synchronized List<String> parseGroupByValues(Object record, String xpath) {
|
239
|
List<String> groupBy_values = null;
|
240
|
logger.debug("groupBy field: "+xpath);
|
241
|
try{
|
242
|
XPathFactory factory = XPathFactory.newInstance();
|
243
|
XPath xPath = factory.newXPath();
|
244
|
NodeList rtl = (NodeList) xPath.evaluate(xpath+"/text()",record ,XPathConstants.NODESET);
|
245
|
groupBy_values = new ArrayList<String>();
|
246
|
if (rtl.getLength() > 0) {
|
247
|
for(int i=0; i<rtl.getLength(); i++) {
|
248
|
Node childNode = rtl.item(i);
|
249
|
groupBy_values.add(childNode.getNodeValue());
|
250
|
logger.debug("value: "+childNode.getNodeValue());
|
251
|
}
|
252
|
groupBy_values.add("all");
|
253
|
} else {
|
254
|
groupBy_values.add("all");
|
255
|
}
|
256
|
|
257
|
} catch (XPathExpressionException e) {
|
258
|
logger.error("error getting object"+ e);
|
259
|
}
|
260
|
return groupBy_values;
|
261
|
}
|
262
|
|
263
|
public Integer getJobSubmittedId() {
|
264
|
return jobSubmittedId;
|
265
|
}
|
266
|
|
267
|
public void setJobSubmittedId(Integer jobSubmittedId) {
|
268
|
this.jobSubmittedId = jobSubmittedId;
|
269
|
}
|
270
|
|
271
|
public String getJobSubmittedUser() {
|
272
|
return jobSubmittedUser;
|
273
|
}
|
274
|
|
275
|
public void setJobSubmittedUser(String jobSubmittedUser) {
|
276
|
this.jobSubmittedUser = jobSubmittedUser;
|
277
|
}
|
278
|
|
279
|
public String getGroupBy_xpath() {
|
280
|
return groupBy_xpath;
|
281
|
}
|
282
|
|
283
|
public void setGroupBy_xpath(String groupBy_xpath) {
|
284
|
this.groupBy_xpath = groupBy_xpath;
|
285
|
}
|
286
|
|
287
|
public String getValBaseUrl() {
|
288
|
return valBaseUrl;
|
289
|
}
|
290
|
|
291
|
public void setValBaseUrl(String valBaseUrl) {
|
292
|
this.valBaseUrl = valBaseUrl;
|
293
|
}
|
294
|
|
295
|
public int getInternalJobsSum() {
|
296
|
return internalJobsSum;
|
297
|
}
|
298
|
|
299
|
public void setInternalJobsSum(int internalJobsSum) {
|
300
|
this.internalJobsSum = internalJobsSum;
|
301
|
}
|
302
|
|
303
|
|
304
|
public Map<String, Map<Integer, RuleStatus>> getScoreMapPerGroupBy() {
|
305
|
return scoreMapPerGroupBy;
|
306
|
}
|
307
|
|
308
|
public void setScoreMapPerGroupBy(
|
309
|
Map<String, Map<Integer, RuleStatus>> scoreMapPerGroupBy) {
|
310
|
this.scoreMapPerGroupBy = scoreMapPerGroupBy;
|
311
|
}
|
312
|
|
313
|
public TasksDAO getTasksDao() {
|
314
|
return tasksDao;
|
315
|
}
|
316
|
|
317
|
public void setTasksDao(TasksDAO tasksDao) {
|
318
|
this.tasksDao = tasksDao;
|
319
|
}
|
320
|
|
321
|
public JobsDAO getJobsDao() {
|
322
|
return jobsDao;
|
323
|
}
|
324
|
|
325
|
public void setJobsDao(JobsDAO jobsDao) {
|
326
|
this.jobsDao = jobsDao;
|
327
|
}
|
328
|
|
329
|
public RulesDAO getRulesDao() {
|
330
|
return rulesDao;
|
331
|
}
|
332
|
|
333
|
public void setRulesDao(RulesDAO rulesDao) {
|
334
|
this.rulesDao = rulesDao;
|
335
|
}
|
336
|
|
337
|
public Map<Integer, Rule> getRuleCacheMap() {
|
338
|
return ruleCacheMap;
|
339
|
}
|
340
|
|
341
|
public void setRuleCacheMap(Map<Integer, Rule> ruleCacheMap) {
|
342
|
this.ruleCacheMap = ruleCacheMap;
|
343
|
}
|
344
|
|
345
|
public String getValidationType() {
|
346
|
return validationType;
|
347
|
}
|
348
|
|
349
|
public void setValidationType(String validationType) {
|
350
|
this.validationType = validationType;
|
351
|
}
|
352
|
}
|
353
|
|