Project

General

Profile

1 34342 nikon.gasp
package eu.dnetlib.validator.service.impls.listeners;
2 17673 nikon.gasp
3 17875 nikon.gasp
import java.util.ArrayList;
4 17710 nikon.gasp
import java.util.HashMap;
5 17673 nikon.gasp
import java.util.List;
6 17710 nikon.gasp
import java.util.Map;
7 35423 nikon.gasp
import java.util.Set;
8 17673 nikon.gasp
9 18252 nikon.gasp
import javax.xml.xpath.XPath;
10
import javax.xml.xpath.XPathConstants;
11
import javax.xml.xpath.XPathExpressionException;
12
import javax.xml.xpath.XPathFactory;
13
14 19683 nikon.gasp
import org.apache.log4j.Logger;
15
import org.springframework.transaction.annotation.Propagation;
16
import org.springframework.transaction.annotation.Transactional;
17 18252 nikon.gasp
import org.w3c.dom.Node;
18
import org.w3c.dom.NodeList;
19 19683 nikon.gasp
20 34342 nikon.gasp
import eu.dnetlib.domain.functionality.validator.Rule;
21 37040 nikon.gasp
import eu.dnetlib.validator.commons.dao.DaoException;
22 34342 nikon.gasp
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
23
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
24
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
25
import eu.dnetlib.validator.commons.dao.tasks.TaskStored;
26
import eu.dnetlib.validator.commons.dao.tasks.TasksDAO;
27 37040 nikon.gasp
import eu.dnetlib.validator.engine.ValidatorException;
28 34342 nikon.gasp
import eu.dnetlib.validator.engine.execution.CompletedTask;
29
import eu.dnetlib.validator.engine.execution.JobListener;
30
31 17673 nikon.gasp
/**
32
 *
33
 * @author Nikon Gasparis
34
 *
35
 */
36
public class ValidatorJobListener implements JobListener {
37 17710 nikon.gasp
	private static Logger logger = Logger.getLogger(ValidatorJobListener.class);
38 17673 nikon.gasp
39 17710 nikon.gasp
	private Integer jobSubmittedId;
40 18398 nikon.gasp
	private String jobSubmittedUser;
41 34342 nikon.gasp
	private TasksDAO tasksDao;
42
	private JobsDAO jobsDao;
43
	private RulesDAO rulesDao;
44
	private String validationType;
45 41622 nikon.gasp
	private Set<Integer> blacklistRuleIds;
46 18398 nikon.gasp
47 17875 nikon.gasp
	private int objsValidated = 0;
48 18398 nikon.gasp
	private int score = 0;
49 18483 nikon.gasp
	private int internalJobsFinished = 0;
50 35423 nikon.gasp
	private int internalJobsSum = 1;
51 34342 nikon.gasp
52
	Map<String,Map<Integer,RuleStatus>> scoreMapPerGroupBy = new HashMap<String,Map<Integer,RuleStatus>>();
53 18398 nikon.gasp
	private String valBaseUrl = null;
54
55
56
57 34342 nikon.gasp
	private Map<Integer, Rule> ruleCacheMap = new HashMap<Integer, Rule>();
58 18311 nikon.gasp
	private String groupBy_xpath = null;
59 17710 nikon.gasp
60 17673 nikon.gasp
	@Override
61 17710 nikon.gasp
	@Transactional(propagation = Propagation.REQUIRED)
62 37040 nikon.gasp
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext, Throwable t) throws ValidatorException {
63
		try {
64
			logger.error("Completed tasks for " + jobId + " (exception). Error: " + t.getMessage());
65
			List<TaskStored> tasksStored = new ArrayList<TaskStored>();
66
			List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
67
			for (CompletedTask ctask : tasks) {
68
				TaskStored taskStored = new TaskStored();
69
				taskStored.setEnded(ctask.finished.toString());
70
				taskStored.setStarted(ctask.started.toString());
71
				taskStored.setJobId(jobSubmittedId);
72
				taskStored.setRecordIdentifier(ctask.valobjId);
73
				taskStored.setRuleId(ctask.ruleId);
74
				taskStored.setSuccess(ctask.success);
75
				taskStored.setStatus("finished");
76
				tasksStored.add(taskStored);
77
78
				logger.debug("JOBID:"+jobSubmittedId+"# Task-failed: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
79
						" ruleId: "+taskStored.getRuleId() + " error:"+ t.getMessage());
80
			}
81
			this.updateScoreMap(groupBy_values, tasksStored);
82
			if (objsValidated % 100 == 0)
83
				jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
84
			objsValidated++;
85
		} catch (Exception e) {
86
			logger.error("Error while proccessing results");
87
			throw new ValidatorException("Error while proccessing results", e);
88 17673 nikon.gasp
		}
89
	}
90
91
	@Override
92 17710 nikon.gasp
	@Transactional(propagation = Propagation.REQUIRED)
93 37040 nikon.gasp
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext) throws ValidatorException {
94
		try {
95
			List<Map<String, String>> veloList = new ArrayList<Map<String, String>>();
96
97
			logger.debug("JOBID:"+jobSubmittedId+"# Updating Completed Tasks");
98
			Map<Integer,Boolean> ruleIdsSuccess = new HashMap<Integer,Boolean>();
99
			Map<String, String> veloMap = null;
100
			List<TaskStored> tasksStored = new ArrayList<TaskStored>();
101
			List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
102 19683 nikon.gasp
103 37040 nikon.gasp
			int recordScore = -1;
104
			for (CompletedTask ctask : tasks) {
105
				veloMap = new HashMap<String, String>();
106
				TaskStored taskStored = new TaskStored();
107
				taskStored.setEnded(ctask.finished.toString());
108
				taskStored.setStarted(ctask.started.toString());
109
				taskStored.setJobId(jobSubmittedId);
110
				taskStored.setRecordIdentifier(ctask.valobjId);
111
				taskStored.setRuleId(ctask.ruleId);
112
				taskStored.setSuccess(ctask.success);
113
				taskStored.setStatus("finished");
114
				tasksStored.add(taskStored);
115
116
	//			logger.debug("JOBID:"+jobSubmittedId+"# Task: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
117
	//					" ruleId: "+taskStored.getRuleId());
118
				ruleIdsSuccess.put(taskStored.getRuleId(),ctask.success);
119
120
	//			logger.debug("inserted on map: "+ "id: " + taskStored.getRuleId());
121
				if (!ctask.success) {
122
					veloMap.put("id", Integer.toString(taskStored.getRuleId()));
123
					veloMap.put("error", "...");
124
				    veloList.add(veloMap);
125 41622 nikon.gasp
//				    if (scoreTriggerRules.contains(ctask.ruleId)) {
126
//				    	recordScore=0;
127
//				    }
128 37040 nikon.gasp
				    Rule rule = this.getRule(taskStored.getRuleId());
129
				    String xpath = null;
130
				    veloMap.put("name", rule.getName());
131
				    if (! rule.getType().contains("Chain"))
132
				    	xpath = rule.getConfiguration().getProperty("xpath");
133
				    else {
134
				    	xpath = this.getRule(Integer.parseInt(rule.getConfiguration().getProperty("rule_2"))).getConfiguration().getProperty("xpath");
135
				    }
136
				    veloMap.put("xpath", xpath);
137
				    veloMap.put("mandatory", Boolean.toString(rule.isMandatory()));
138
				}
139
140
			}
141
			this.updateScoreMap(groupBy_values, tasksStored);
142
143
			// WORKFLOWS BEGIN
144 41622 nikon.gasp
			int recordBlacklistScore = -1;
145
			if (recordScore == -1) {
146 37040 nikon.gasp
				recordScore = this.calculateRecordScore(ruleIdsSuccess);
147 41622 nikon.gasp
				if (blacklistRuleIds != null )
148
					recordBlacklistScore = this.calculateBlacklistRecordScore(ruleIdsSuccess, blacklistRuleIds);
149
			}
150
151 37040 nikon.gasp
			Map<String,String> recordValidationResult = new HashMap<String,String>();
152
			recordValidationResult.put("score", Integer.toString(score));
153
			recordValidationResult.put("id", tasks.get(0).valobjId);
154
			if (score > 0)
155
				recordValidationResult.put("status", "pass");
156
			else
157
				recordValidationResult.put("status", "fail");
158 19683 nikon.gasp
159 37040 nikon.gasp
			recordContext.put("veloList", (List<Map<String, String>>) veloList);
160
			recordContext.put("recordValidationResult", (Map<String,String>) recordValidationResult);
161
			recordContext.put("score", (int) score);
162 41622 nikon.gasp
			recordContext.put("recordBlacklistScore", (int) recordBlacklistScore);
163 31252 nikon.gasp
164 37040 nikon.gasp
			// WORKFLOWS END
165
			if (objsValidated % 100 == 0)
166
				jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
167
			objsValidated++;
168
		} catch (Exception e) {
169
			logger.error("Error while proccessing results");
170
			throw new ValidatorException("Error while proccessing results", e);
171 17673 nikon.gasp
		}
172
	}
173
174 19954 nikon.gasp
	@Override
175
	@Transactional(propagation = Propagation.REQUIRED)
176
	public synchronized void finished(int jobId, Map<String,Object> jobContext) {
177 37040 nikon.gasp
		try {
178
			internalJobsFinished++;
179
			if(internalJobsFinished == internalJobsSum) {
180
				logger.debug("JOBID:"+ this.jobSubmittedId +"# Set job finished");
181
				jobContext.put("jobSubmittedId", (int) jobSubmittedId);
182
				jobContext.put("jobSubmittedUser", (String) jobSubmittedUser);
183
184
				score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy, null, false, objsValidated, validationType);
185
				logger.debug("score_"+validationType +": " + score);
186
				jobContext.put("score_"+validationType, (int) score);
187
				tasksDao.saveTasks(this.scoreMapPerGroupBy.get("all"));
188
				if (jobSubmittedUser.contains("Workflow")) {
189
					jobsDao.setTotalJobFinished(jobId, null, false);
190
				}
191
			} else {
192
				logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
193 34342 nikon.gasp
			}
194 37040 nikon.gasp
		} catch (Exception e) {
195
			logger.error("Error while finalizing successfull job");
196 19954 nikon.gasp
		}
197
	}
198
199
	@Override
200
	@Transactional(propagation = Propagation.REQUIRED)
201
	public synchronized void failed(int jobId, Map<String,Object> jobContext, Throwable t) {
202 37040 nikon.gasp
		try {
203
			internalJobsFinished++;
204
			if(internalJobsFinished == internalJobsSum) {
205
				logger.debug("JOBID:"+jobSubmittedId+"# Set job finished-failed");
206
				jobContext.put("jobSubmittedId", (int) jobSubmittedId);
207
				jobContext.put("jobSubmittedUser",(String) jobSubmittedUser);
208
				score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy,t.getMessage(), true, objsValidated, validationType);
209
				jobContext.put("score_"+validationType, (int) 0);
210
211
				if (jobSubmittedUser.contains("Workflow")) {
212
					jobsDao.setTotalJobFinished(jobId, t.getMessage(), true);
213
				}
214
			} else {
215
				logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
216 34342 nikon.gasp
			}
217 37040 nikon.gasp
		} catch (Exception e) {
218
			logger.error("Error while finalizing failed job");
219 19954 nikon.gasp
		}
220
	}
221
222 37040 nikon.gasp
	private synchronized void updateScoreMap(List<String> groupBy_values, List<TaskStored> tasksStored) throws Exception {
223 31326 nikon.gasp
		for (String groupBy : groupBy_values) {
224
			logger.debug("updating map for groupBy value: " + groupBy);
225 34342 nikon.gasp
			Map<Integer,RuleStatus> scoreMapPerRule = null;
226 31326 nikon.gasp
			if((scoreMapPerRule=this.scoreMapPerGroupBy.get(groupBy)) == null) {
227
				logger.debug("map for groupBy value: " + groupBy + " doesn't exist");
228 34342 nikon.gasp
				scoreMapPerRule = new HashMap<Integer, RuleStatus>();
229 31326 nikon.gasp
			} else {
230
				logger.debug("map for groupBy value: " + groupBy + " exists");
231
			}
232
			for (TaskStored task : tasksStored) {
233
				RuleStatus ruleSt = null;
234
				if((ruleSt=scoreMapPerRule.get(task.getRuleId())) == null) {
235
					logger.debug("ruleStatus for rule with id : " + task.getRuleId() + " doesn't exist");
236
					ruleSt = new RuleStatus();
237
				}
238 34342 nikon.gasp
				Rule rule = null;
239 31326 nikon.gasp
				if((rule=ruleCacheMap.get(task.getRuleId())) == null) {
240 34342 nikon.gasp
					rule = rulesDao.get(task.getRuleId());
241 31326 nikon.gasp
					ruleCacheMap.put(task.getRuleId(), rule);
242
				}
243 34781 nikon.gasp
				ruleSt.setMandatory(rule.isMandatory());
244
				ruleSt.setWeight(rule.getWeight());
245 31326 nikon.gasp
				ruleSt.total++;
246
				if (task.getSuccess())
247
					ruleSt.success++;
248
				else {
249
					if (groupBy.equals("all") && ruleSt.getFailedTasks().size() < 10 )
250
						ruleSt.getFailedTasks().add(task);
251
				}
252
				scoreMapPerRule.put(task.getRuleId(), ruleSt);
253
			}
254
			this.scoreMapPerGroupBy.put(groupBy, scoreMapPerRule);
255
		}
256
	}
257
258 37040 nikon.gasp
	private int calculateRecordScore(Map<Integer,Boolean> ruleIdsSuccess) throws Exception {
259 19683 nikon.gasp
		float score = 0;
260
		float sum = 0;
261
		float weights = 0 ;
262
		for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
263 34342 nikon.gasp
			Rule rule = ruleCacheMap.get(entry.getKey());
264 34781 nikon.gasp
			if (rule.isMandatory()) {
265
				weights += rule.getWeight();
266 19683 nikon.gasp
				if (entry.getValue()) {
267 34781 nikon.gasp
					sum += rule.getWeight();
268 19683 nikon.gasp
				}
269
			}
270
			score = (sum/weights)*100;
271
272
		}
273
		return (int) score;
274
	}
275 41622 nikon.gasp
276
	private int calculateBlacklistRecordScore(Map<Integer,Boolean> ruleIdsSuccess, Set<Integer> blacklistRuleIds) throws Exception {
277
		float score = 0;
278
		float sum = 0;
279
		float weights = 0 ;
280
		for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
281
			Rule rule = ruleCacheMap.get(entry.getKey());
282
			if (rule.isMandatory() && blacklistRuleIds.contains(entry.getKey())) {
283
				weights += rule.getWeight();
284
				if (entry.getValue()) {
285
					sum += rule.getWeight();
286
				}
287
			}
288
			score = (sum/weights)*100;
289
		}
290
		return (int) score;
291
	}
292 19683 nikon.gasp
293 31326 nikon.gasp
	private synchronized List<String> parseGroupByValues(Object record, String xpath) {
294 18311 nikon.gasp
		List<String> groupBy_values = null;
295
		logger.debug("groupBy field: "+xpath);
296 18252 nikon.gasp
		try{
297
			XPathFactory factory = XPathFactory.newInstance();
298
	        XPath xPath = factory.newXPath();
299 18311 nikon.gasp
	        NodeList rtl = (NodeList) xPath.evaluate(xpath+"/text()",record ,XPathConstants.NODESET);
300 18315 nikon.gasp
	        groupBy_values = new ArrayList<String>();
301 18252 nikon.gasp
	        if (rtl.getLength() > 0) {
302 18315 nikon.gasp
	        	for(int i=0; i<rtl.getLength(); i++) {
303 18252 nikon.gasp
	        		Node childNode = rtl.item(i);
304 18311 nikon.gasp
	        		groupBy_values.add(childNode.getNodeValue());
305
	        		logger.debug("value: "+childNode.getNodeValue());
306 18252 nikon.gasp
	        	}
307 31252 nikon.gasp
	        	groupBy_values.add("all");
308 18315 nikon.gasp
	        } else {
309
	        	groupBy_values.add("all");
310 18252 nikon.gasp
	        }
311 18315 nikon.gasp
312 18252 nikon.gasp
	 	} catch (XPathExpressionException e) {
313
			logger.error("error getting object"+ e);
314
	 	}
315 18311 nikon.gasp
		return groupBy_values;
316 18252 nikon.gasp
	}
317 18398 nikon.gasp
318 37040 nikon.gasp
	private Rule getRule(int id) throws DaoException {
319 35734 nikon.gasp
		Rule rule = null;
320
		if((rule=ruleCacheMap.get(id)) == null) {
321
			rule = rulesDao.get(id);
322
			ruleCacheMap.put(id, rule);
323
		}
324
		return rule;
325
326
	}
327 17710 nikon.gasp
	public Integer getJobSubmittedId() {
328
		return jobSubmittedId;
329
	}
330 17673 nikon.gasp
331 17710 nikon.gasp
	public void setJobSubmittedId(Integer jobSubmittedId) {
332
		this.jobSubmittedId = jobSubmittedId;
333 17673 nikon.gasp
	}
334
335 18398 nikon.gasp
	public String getJobSubmittedUser() {
336
		return jobSubmittedUser;
337
	}
338
339
	public void setJobSubmittedUser(String jobSubmittedUser) {
340
		this.jobSubmittedUser = jobSubmittedUser;
341
	}
342
343 34342 nikon.gasp
	public String getGroupBy_xpath() {
344
		return groupBy_xpath;
345 17710 nikon.gasp
	}
346 17673 nikon.gasp
347 34342 nikon.gasp
	public void setGroupBy_xpath(String groupBy_xpath) {
348
		this.groupBy_xpath = groupBy_xpath;
349 17673 nikon.gasp
	}
350
351 34342 nikon.gasp
	public String getValBaseUrl() {
352
		return valBaseUrl;
353 17710 nikon.gasp
	}
354 17673 nikon.gasp
355 34342 nikon.gasp
	public void setValBaseUrl(String valBaseUrl) {
356
		this.valBaseUrl = valBaseUrl;
357 17673 nikon.gasp
	}
358
359 34342 nikon.gasp
	public int getInternalJobsSum() {
360
		return internalJobsSum;
361 17875 nikon.gasp
	}
362
363 34342 nikon.gasp
	public void setInternalJobsSum(int internalJobsSum) {
364
		this.internalJobsSum = internalJobsSum;
365 17875 nikon.gasp
	}
366
367 34342 nikon.gasp
368
	public Map<String, Map<Integer, RuleStatus>> getScoreMapPerGroupBy() {
369
		return scoreMapPerGroupBy;
370 17673 nikon.gasp
	}
371
372 34342 nikon.gasp
	public void setScoreMapPerGroupBy(
373
			Map<String, Map<Integer, RuleStatus>> scoreMapPerGroupBy) {
374
		this.scoreMapPerGroupBy = scoreMapPerGroupBy;
375 17673 nikon.gasp
	}
376 18311 nikon.gasp
377 34342 nikon.gasp
	public TasksDAO getTasksDao() {
378
		return tasksDao;
379 18311 nikon.gasp
	}
380
381 34342 nikon.gasp
	public void setTasksDao(TasksDAO tasksDao) {
382
		this.tasksDao = tasksDao;
383 18311 nikon.gasp
	}
384
385 34342 nikon.gasp
	public JobsDAO getJobsDao() {
386
		return jobsDao;
387 18483 nikon.gasp
	}
388
389 34342 nikon.gasp
	public void setJobsDao(JobsDAO jobsDao) {
390
		this.jobsDao = jobsDao;
391 18483 nikon.gasp
	}
392
393 34342 nikon.gasp
	public RulesDAO getRulesDao() {
394
		return rulesDao;
395 18483 nikon.gasp
	}
396
397 34342 nikon.gasp
	public void setRulesDao(RulesDAO rulesDao) {
398
		this.rulesDao = rulesDao;
399 18483 nikon.gasp
	}
400 31252 nikon.gasp
401 34342 nikon.gasp
	public Map<Integer, Rule> getRuleCacheMap() {
402 31252 nikon.gasp
		return ruleCacheMap;
403
	}
404
405 34342 nikon.gasp
	public void setRuleCacheMap(Map<Integer, Rule> ruleCacheMap) {
406 31252 nikon.gasp
		this.ruleCacheMap = ruleCacheMap;
407
	}
408 34342 nikon.gasp
409
	public String getValidationType() {
410
		return validationType;
411
	}
412
413
	public void setValidationType(String validationType) {
414
		this.validationType = validationType;
415
	}
416 35423 nikon.gasp
417 41622 nikon.gasp
	public Set<Integer> getBlacklistRuleIds() {
418
		return blacklistRuleIds;
419 35423 nikon.gasp
	}
420
421 41622 nikon.gasp
	public void setBlacklistRuleIds(Set<Integer> blacklistRuleIds) {
422
		this.blacklistRuleIds = blacklistRuleIds;
423 35423 nikon.gasp
	}
424 41622 nikon.gasp
425 35423 nikon.gasp
426
427 41622 nikon.gasp
428 18252 nikon.gasp
}