Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.util.ArrayList;
4
import java.util.HashMap;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Set;
8

    
9
import javax.xml.xpath.XPath;
10
import javax.xml.xpath.XPathConstants;
11
import javax.xml.xpath.XPathExpressionException;
12
import javax.xml.xpath.XPathFactory;
13

    
14
import org.apache.log4j.Logger;
15
import org.springframework.transaction.annotation.Propagation;
16
import org.springframework.transaction.annotation.Transactional;
17
import org.w3c.dom.Node;
18
import org.w3c.dom.NodeList;
19

    
20
import eu.dnetlib.domain.functionality.validator.Rule;
21
import eu.dnetlib.validator.commons.dao.DaoException;
22
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
23
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
24
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
25
import eu.dnetlib.validator.commons.dao.tasks.TaskStored;
26
import eu.dnetlib.validator.commons.dao.tasks.TasksDAO;
27
import eu.dnetlib.validator.engine.ValidatorException;
28
import eu.dnetlib.validator.engine.execution.CompletedTask;
29
import eu.dnetlib.validator.engine.execution.JobListener;
30

    
31
/**
32
 * 
33
 * @author Nikon Gasparis
34
 *
35
 */
36
public class ValidatorJobListener implements JobListener {
37
	private static Logger logger = Logger.getLogger(ValidatorJobListener.class);
38

    
39
	private Integer jobSubmittedId;
40
	private String jobSubmittedUser;
41
	private TasksDAO tasksDao;
42
	private JobsDAO jobsDao;
43
	private RulesDAO rulesDao;
44
	private String validationType;
45
	private Set<Integer> blacklistRuleIds;
46
	
47
	private int objsValidated = 0;
48
	private int score = 0;
49
	private int internalJobsFinished = 0;
50
	private int internalJobsSum = 1;
51
	
52
	Map<String,Map<Integer,RuleStatus>> scoreMapPerGroupBy = new HashMap<String,Map<Integer,RuleStatus>>();
53
	private String valBaseUrl = null;
54
	
55

    
56

    
57
	private Map<Integer, Rule> ruleCacheMap = new HashMap<Integer, Rule>();
58
	private String groupBy_xpath = null;
59

    
60
	@Override
61
	@Transactional(propagation = Propagation.REQUIRED)
62
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext, Throwable t) throws ValidatorException {
63
		try {
64
			logger.error("Completed tasks for " + jobId + " (exception). Error: " + t.getMessage());
65
			List<TaskStored> tasksStored = new ArrayList<TaskStored>();
66
			List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
67
			for (CompletedTask ctask : tasks) {
68
				TaskStored taskStored = new TaskStored();
69
				taskStored.setEnded(ctask.finished.toString());
70
				taskStored.setStarted(ctask.started.toString());
71
				taskStored.setJobId(jobSubmittedId);
72
				taskStored.setRecordIdentifier(ctask.valobjId);
73
				taskStored.setRuleId(ctask.ruleId);
74
				taskStored.setSuccess(ctask.success);
75
				taskStored.setStatus("finished");
76
				tasksStored.add(taskStored);
77
				
78
				logger.debug("JOBID:"+jobSubmittedId+"# Task-failed: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
79
						" ruleId: "+taskStored.getRuleId() + " error:"+ t.getMessage());
80
			}
81
			this.updateScoreMap(groupBy_values, tasksStored);
82
			if (objsValidated % 100 == 0)
83
				jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
84
			objsValidated++;		
85
		} catch (Exception e) {
86
			logger.error("Error while proccessing results");
87
			throw new ValidatorException("Error while proccessing results", e);
88
		}
89
	}
90

    
91
	@Override
92
	@Transactional(propagation = Propagation.REQUIRED)
93
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext) throws ValidatorException {
94
		try {
95
			List<Map<String, String>> veloList = new ArrayList<Map<String, String>>();
96
				
97
			logger.debug("JOBID:"+jobSubmittedId+"# Updating Completed Tasks");
98
			Map<Integer,Boolean> ruleIdsSuccess = new HashMap<Integer,Boolean>();
99
			Map<String, String> veloMap = null;
100
			List<TaskStored> tasksStored = new ArrayList<TaskStored>();
101
			List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
102
			
103
			int recordScore = -1;
104
			for (CompletedTask ctask : tasks) {
105
				veloMap = new HashMap<String, String>();
106
				TaskStored taskStored = new TaskStored();
107
				taskStored.setEnded(ctask.finished.toString());
108
				taskStored.setStarted(ctask.started.toString());
109
				taskStored.setJobId(jobSubmittedId);
110
				taskStored.setRecordIdentifier(ctask.valobjId);
111
				taskStored.setRuleId(ctask.ruleId);
112
				taskStored.setSuccess(ctask.success);
113
				taskStored.setStatus("finished");
114
				tasksStored.add(taskStored);
115
				
116
	//			logger.debug("JOBID:"+jobSubmittedId+"# Task: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
117
	//					" ruleId: "+taskStored.getRuleId());
118
				ruleIdsSuccess.put(taskStored.getRuleId(),ctask.success);
119
				
120
	//			logger.debug("inserted on map: "+ "id: " + taskStored.getRuleId());
121
				if (!ctask.success) {
122
					veloMap.put("id", Integer.toString(taskStored.getRuleId()));
123
					veloMap.put("error", "...");
124
				    veloList.add(veloMap);
125
//				    if (scoreTriggerRules.contains(ctask.ruleId)) {
126
//				    	recordScore=0;
127
//				    }
128
				    Rule rule = this.getRule(taskStored.getRuleId());
129
				    String xpath = null;
130
				    veloMap.put("name", rule.getName());
131
				    if (! rule.getType().contains("Chain"))
132
				    	xpath = rule.getConfiguration().getProperty("xpath");
133
				    else {
134
				    	xpath = this.getRule(Integer.parseInt(rule.getConfiguration().getProperty("rule_2"))).getConfiguration().getProperty("xpath");
135
				    }
136
				    veloMap.put("xpath", xpath);
137
				    veloMap.put("mandatory", Boolean.toString(rule.isMandatory()));
138
				}
139
					
140
			}
141
			this.updateScoreMap(groupBy_values, tasksStored);
142
	
143
			// WORKFLOWS BEGIN
144
			int recordBlacklistScore = -1;
145
			if (recordScore == -1) {
146
				recordScore = this.calculateRecordScore(ruleIdsSuccess);
147
				if (blacklistRuleIds != null )
148
					recordBlacklistScore = this.calculateBlacklistRecordScore(ruleIdsSuccess, blacklistRuleIds);
149
			}
150
			
151
			Map<String,String> recordValidationResult = new HashMap<String,String>();
152
			recordValidationResult.put("score", Integer.toString(score));
153
			recordValidationResult.put("id", tasks.get(0).valobjId);
154
			if (score > 0)
155
				recordValidationResult.put("status", "pass");
156
			else
157
				recordValidationResult.put("status", "fail");
158
			
159
			recordContext.put("veloList", (List<Map<String, String>>) veloList);
160
			recordContext.put("recordValidationResult", (Map<String,String>) recordValidationResult);
161
			recordContext.put("score", (int) score);
162
			recordContext.put("recordBlacklistScore", (int) recordBlacklistScore);
163
			
164
			// WORKFLOWS END
165
			if (objsValidated % 100 == 0)
166
				jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
167
			objsValidated++;
168
		} catch (Exception e) {
169
			logger.error("Error while proccessing results");
170
			throw new ValidatorException("Error while proccessing results", e);
171
		}
172
	}
173

    
174
	@Override
175
	@Transactional(propagation = Propagation.REQUIRED)
176
	public synchronized void finished(int jobId, Map<String,Object> jobContext) {
177
		try {
178
			internalJobsFinished++;
179
			if(internalJobsFinished == internalJobsSum) { 
180
				logger.debug("JOBID:"+ this.jobSubmittedId +"# Set job finished");
181
				jobContext.put("jobSubmittedId", (int) jobSubmittedId);
182
				jobContext.put("jobSubmittedUser", (String) jobSubmittedUser);
183
	
184
				score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy, null, false, objsValidated, validationType);
185
				logger.debug("score_"+validationType +": " + score);
186
				jobContext.put("score_"+validationType, (int) score);
187
				tasksDao.saveTasks(this.scoreMapPerGroupBy.get("all"));
188
				if (jobSubmittedUser.contains("Workflow")) {
189
					jobsDao.setTotalJobFinished(jobId, null, false);
190
				}
191
			} else {
192
				logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
193
			}
194
		} catch (Exception e) {
195
			logger.error("Error while finalizing successfull job");
196
		}
197
	}
198
	
199
	@Override
200
	@Transactional(propagation = Propagation.REQUIRED)
201
	public synchronized void failed(int jobId, Map<String,Object> jobContext, Throwable t) {
202
		try {
203
			internalJobsFinished++;
204
			if(internalJobsFinished == internalJobsSum) { 		
205
				logger.debug("JOBID:"+jobSubmittedId+"# Set job finished-failed");
206
				jobContext.put("jobSubmittedId", (int) jobSubmittedId);
207
				jobContext.put("jobSubmittedUser",(String) jobSubmittedUser);
208
				score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy,t.getMessage(), true, objsValidated, validationType);
209
				jobContext.put("score_"+validationType, (int) 0);
210
				
211
				if (jobSubmittedUser.contains("Workflow")) {
212
					jobsDao.setTotalJobFinished(jobId, t.getMessage(), true);
213
				}
214
			} else {
215
				logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
216
			}
217
		} catch (Exception e) {
218
			logger.error("Error while finalizing failed job");
219
		}
220
	}
221
			
222
	private synchronized void updateScoreMap(List<String> groupBy_values, List<TaskStored> tasksStored) throws Exception {
223
		for (String groupBy : groupBy_values) {
224
			logger.debug("updating map for groupBy value: " + groupBy);
225
			Map<Integer,RuleStatus> scoreMapPerRule = null;
226
			if((scoreMapPerRule=this.scoreMapPerGroupBy.get(groupBy)) == null) {
227
				logger.debug("map for groupBy value: " + groupBy + " doesn't exist");
228
				scoreMapPerRule = new HashMap<Integer, RuleStatus>();
229
			} else {
230
				logger.debug("map for groupBy value: " + groupBy + " exists");
231
			}
232
			for (TaskStored task : tasksStored) {
233
				RuleStatus ruleSt = null;
234
				if((ruleSt=scoreMapPerRule.get(task.getRuleId())) == null) {
235
					logger.debug("ruleStatus for rule with id : " + task.getRuleId() + " doesn't exist");
236
					ruleSt = new RuleStatus();
237
				} 
238
				Rule rule = null;
239
				if((rule=ruleCacheMap.get(task.getRuleId())) == null) {
240
					rule = rulesDao.get(task.getRuleId());
241
					ruleCacheMap.put(task.getRuleId(), rule);
242
				}
243
				ruleSt.setMandatory(rule.isMandatory());
244
				ruleSt.setWeight(rule.getWeight());
245
				ruleSt.total++;
246
				if (task.getSuccess())
247
					ruleSt.success++;
248
				else {
249
					if (groupBy.equals("all") && ruleSt.getFailedTasks().size() < 10 )
250
						ruleSt.getFailedTasks().add(task);
251
				}
252
				scoreMapPerRule.put(task.getRuleId(), ruleSt);
253
			}
254
			this.scoreMapPerGroupBy.put(groupBy, scoreMapPerRule);
255
		}
256
	} 
257
	
258
	private int calculateRecordScore(Map<Integer,Boolean> ruleIdsSuccess) throws Exception {
259
		float score = 0;
260
		float sum = 0;
261
		float weights = 0 ;
262
		for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
263
			Rule rule = ruleCacheMap.get(entry.getKey());
264
			if (rule.isMandatory()) {
265
				weights += rule.getWeight();
266
				if (entry.getValue()) {
267
					sum += rule.getWeight();
268
				}
269
			}
270
			score = (sum/weights)*100;
271
			
272
		}
273
		return (int) score;
274
	}
275
	
276
	private int calculateBlacklistRecordScore(Map<Integer,Boolean> ruleIdsSuccess, Set<Integer> blacklistRuleIds) throws Exception {
277
		float score = 0;
278
		float sum = 0;
279
		float weights = 0 ;
280
		for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
281
			Rule rule = ruleCacheMap.get(entry.getKey());
282
			if (rule.isMandatory() && blacklistRuleIds.contains(entry.getKey())) {
283
				weights += rule.getWeight();
284
				if (entry.getValue()) {
285
					sum += rule.getWeight();
286
				}
287
			}
288
			score = (sum/weights)*100;
289
		}
290
		return (int) score;
291
	}
292

    
293
	private synchronized List<String> parseGroupByValues(Object record, String xpath) {
294
		List<String> groupBy_values = null;
295
		logger.debug("groupBy field: "+xpath);
296
		try{
297
			XPathFactory factory = XPathFactory.newInstance();
298
	        XPath xPath = factory.newXPath();
299
	        NodeList rtl = (NodeList) xPath.evaluate(xpath+"/text()",record ,XPathConstants.NODESET);
300
	        groupBy_values = new ArrayList<String>();
301
	        if (rtl.getLength() > 0) {
302
	        	for(int i=0; i<rtl.getLength(); i++) {
303
	        		Node childNode = rtl.item(i);
304
	        		groupBy_values.add(childNode.getNodeValue());
305
	        		logger.debug("value: "+childNode.getNodeValue());
306
	        	}
307
	        	groupBy_values.add("all");
308
	        } else {
309
	        	groupBy_values.add("all");
310
	        }
311
	        	
312
	 	} catch (XPathExpressionException e) {
313
			logger.error("error getting object"+ e);
314
	 	}
315
		return groupBy_values;
316
	}
317
	
318
	private Rule getRule(int id) throws DaoException {
319
		Rule rule = null;
320
		if((rule=ruleCacheMap.get(id)) == null) {
321
			rule = rulesDao.get(id);
322
			ruleCacheMap.put(id, rule);
323
		}
324
		return rule;
325
		
326
	}
327
	public Integer getJobSubmittedId() {
328
		return jobSubmittedId;
329
	}
330

    
331
	public void setJobSubmittedId(Integer jobSubmittedId) {
332
		this.jobSubmittedId = jobSubmittedId;
333
	}
334

    
335
	public String getJobSubmittedUser() {
336
		return jobSubmittedUser;
337
	}
338

    
339
	public void setJobSubmittedUser(String jobSubmittedUser) {
340
		this.jobSubmittedUser = jobSubmittedUser;
341
	}
342

    
343
	public String getGroupBy_xpath() {
344
		return groupBy_xpath;
345
	}
346

    
347
	public void setGroupBy_xpath(String groupBy_xpath) {
348
		this.groupBy_xpath = groupBy_xpath;
349
	}
350

    
351
	public String getValBaseUrl() {
352
		return valBaseUrl;
353
	}
354

    
355
	public void setValBaseUrl(String valBaseUrl) {
356
		this.valBaseUrl = valBaseUrl;
357
	}
358

    
359
	public int getInternalJobsSum() {
360
		return internalJobsSum;
361
	}
362

    
363
	public void setInternalJobsSum(int internalJobsSum) {
364
		this.internalJobsSum = internalJobsSum;
365
	}
366

    
367

    
368
	public Map<String, Map<Integer, RuleStatus>> getScoreMapPerGroupBy() {
369
		return scoreMapPerGroupBy;
370
	}
371

    
372
	public void setScoreMapPerGroupBy(
373
			Map<String, Map<Integer, RuleStatus>> scoreMapPerGroupBy) {
374
		this.scoreMapPerGroupBy = scoreMapPerGroupBy;
375
	}
376

    
377
	public TasksDAO getTasksDao() {
378
		return tasksDao;
379
	}
380

    
381
	public void setTasksDao(TasksDAO tasksDao) {
382
		this.tasksDao = tasksDao;
383
	}
384

    
385
	public JobsDAO getJobsDao() {
386
		return jobsDao;
387
	}
388

    
389
	public void setJobsDao(JobsDAO jobsDao) {
390
		this.jobsDao = jobsDao;
391
	}
392

    
393
	public RulesDAO getRulesDao() {
394
		return rulesDao;
395
	}
396

    
397
	public void setRulesDao(RulesDAO rulesDao) {
398
		this.rulesDao = rulesDao;
399
	}
400

    
401
	public Map<Integer, Rule> getRuleCacheMap() {
402
		return ruleCacheMap;
403
	}
404

    
405
	public void setRuleCacheMap(Map<Integer, Rule> ruleCacheMap) {
406
		this.ruleCacheMap = ruleCacheMap;
407
	}
408

    
409
	public String getValidationType() {
410
		return validationType;
411
	}
412

    
413
	public void setValidationType(String validationType) {
414
		this.validationType = validationType;
415
	}
416

    
417
	public Set<Integer> getBlacklistRuleIds() {
418
		return blacklistRuleIds;
419
	}
420

    
421
	public void setBlacklistRuleIds(Set<Integer> blacklistRuleIds) {
422
		this.blacklistRuleIds = blacklistRuleIds;
423
	}
424

    
425
	
426
	
427
	
428
}
429

    
(8-8/8)