Project

General

Profile

1 34342 nikon.gasp
package eu.dnetlib.validator.service.impls.listeners;
2 17673 nikon.gasp
3 17875 nikon.gasp
import java.util.ArrayList;
4 17710 nikon.gasp
import java.util.HashMap;
5 17673 nikon.gasp
import java.util.List;
6 17710 nikon.gasp
import java.util.Map;
7 17673 nikon.gasp
8 18252 nikon.gasp
import javax.xml.xpath.XPath;
9
import javax.xml.xpath.XPathConstants;
10
import javax.xml.xpath.XPathExpressionException;
11
import javax.xml.xpath.XPathFactory;
12
13 19683 nikon.gasp
import org.apache.log4j.Logger;
14
import org.springframework.transaction.annotation.Propagation;
15
import org.springframework.transaction.annotation.Transactional;
16 18252 nikon.gasp
import org.w3c.dom.Node;
17
import org.w3c.dom.NodeList;
18 19683 nikon.gasp
19 34342 nikon.gasp
import eu.dnetlib.domain.functionality.validator.Rule;
20
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
21
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
22
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
23
import eu.dnetlib.validator.commons.dao.tasks.TaskStored;
24
import eu.dnetlib.validator.commons.dao.tasks.TasksDAO;
25
import eu.dnetlib.validator.engine.execution.CompletedTask;
26
import eu.dnetlib.validator.engine.execution.JobListener;
27
28 17673 nikon.gasp
/**
29
 *
30
 * @author Nikon Gasparis
31
 *
32
 */
33
public class ValidatorJobListener implements JobListener {
34 17710 nikon.gasp
	private static Logger logger = Logger.getLogger(ValidatorJobListener.class);
35 17673 nikon.gasp
36 17710 nikon.gasp
	private Integer jobSubmittedId;
37 18398 nikon.gasp
	private String jobSubmittedUser;
38 34342 nikon.gasp
	private TasksDAO tasksDao;
39
	private JobsDAO jobsDao;
40
	private RulesDAO rulesDao;
41
	private String validationType;
42 18398 nikon.gasp
43 17875 nikon.gasp
	private int objsValidated = 0;
44 18398 nikon.gasp
	private int score = 0;
45 18483 nikon.gasp
	private int internalJobsFinished = 0;
46 34342 nikon.gasp
47
	Map<String,Map<Integer,RuleStatus>> scoreMapPerGroupBy = new HashMap<String,Map<Integer,RuleStatus>>();
48 18398 nikon.gasp
	private String valBaseUrl = null;
49
50 18483 nikon.gasp
	private int internalJobsSum = 1;
51 18398 nikon.gasp
52
53 34342 nikon.gasp
	private Map<Integer, Rule> ruleCacheMap = new HashMap<Integer, Rule>();
54 18311 nikon.gasp
	private String groupBy_xpath = null;
55 17710 nikon.gasp
56 17673 nikon.gasp
	@Override
57 17710 nikon.gasp
	@Transactional(propagation = Propagation.REQUIRED)
58 19954 nikon.gasp
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext, Throwable t) {
59
		logger.error("Completed tasks for " + jobId + " (exception). Error: " + t.getMessage());
60 17928 nikon.gasp
		List<TaskStored> tasksStored = new ArrayList<TaskStored>();
61 31326 nikon.gasp
		List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
62 17710 nikon.gasp
		for (CompletedTask ctask : tasks) {
63 17673 nikon.gasp
			TaskStored taskStored = new TaskStored();
64
			taskStored.setEnded(ctask.finished.toString());
65
			taskStored.setStarted(ctask.started.toString());
66 34342 nikon.gasp
			taskStored.setJobId(jobSubmittedId);
67 17673 nikon.gasp
			taskStored.setRecordIdentifier(ctask.valobjId);
68 34342 nikon.gasp
			taskStored.setRuleId(ctask.ruleId);
69 17673 nikon.gasp
			taskStored.setSuccess(ctask.success);
70
			taskStored.setStatus("finished");
71 17928 nikon.gasp
			tasksStored.add(taskStored);
72 31252 nikon.gasp
73 17875 nikon.gasp
			logger.debug("JOBID:"+jobSubmittedId+"# Task-failed: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
74
					" ruleId: "+taskStored.getRuleId() + " error:"+ t.getMessage());
75 17673 nikon.gasp
		}
76 31326 nikon.gasp
		this.updateScoreMap(groupBy_values, tasksStored);
77
//		taskStoredDao.saveTasks(tasksStored,groupBy_values);
78 31252 nikon.gasp
		if (objsValidated % 100 == 0)
79 34342 nikon.gasp
			jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
80 17875 nikon.gasp
		objsValidated++;
81 17673 nikon.gasp
	}
82
83
	@Override
84 17710 nikon.gasp
	@Transactional(propagation = Propagation.REQUIRED)
85 19954 nikon.gasp
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext) {
86 19683 nikon.gasp
87
		List<Map<String, String>> veloList = new ArrayList<Map<String, String>>();
88
89 17710 nikon.gasp
		logger.debug("JOBID:"+jobSubmittedId+"# Updating Completed Tasks");
90 19683 nikon.gasp
		Map<Integer,Boolean> ruleIdsSuccess = new HashMap<Integer,Boolean>();
91
		Map<String, String> veloMap = null;
92 31252 nikon.gasp
		List<TaskStored> tasksStored = new ArrayList<TaskStored>();
93
		List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
94
95 17710 nikon.gasp
		for (CompletedTask ctask : tasks) {
96 19683 nikon.gasp
			veloMap = new HashMap<String, String>();
97 17673 nikon.gasp
			TaskStored taskStored = new TaskStored();
98
			taskStored.setEnded(ctask.finished.toString());
99
			taskStored.setStarted(ctask.started.toString());
100 34342 nikon.gasp
			taskStored.setJobId(jobSubmittedId);
101 17673 nikon.gasp
			taskStored.setRecordIdentifier(ctask.valobjId);
102 34342 nikon.gasp
			taskStored.setRuleId(ctask.ruleId);
103 17673 nikon.gasp
			taskStored.setSuccess(ctask.success);
104
			taskStored.setStatus("finished");
105 17928 nikon.gasp
			tasksStored.add(taskStored);
106 31252 nikon.gasp
107 19954 nikon.gasp
//			logger.debug("JOBID:"+jobSubmittedId+"# Task: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
108
//					" ruleId: "+taskStored.getRuleId());
109 34342 nikon.gasp
			ruleIdsSuccess.put(taskStored.getRuleId(),ctask.success);
110 19683 nikon.gasp
111 19954 nikon.gasp
//			logger.debug("inserted on map: "+ "id: " + taskStored.getRuleId());
112 22190 nikon.gasp
			if (!ctask.success) {
113 34342 nikon.gasp
				veloMap.put("id", Integer.toString(taskStored.getRuleId()));
114 22190 nikon.gasp
				veloMap.put("error", "...");
115
			    veloList.add(veloMap);
116
			}
117 19683 nikon.gasp
118 17673 nikon.gasp
		}
119 31326 nikon.gasp
		this.updateScoreMap(groupBy_values, tasksStored);
120 31252 nikon.gasp
121
		// WORKFLOWS BEGIN
122 19683 nikon.gasp
		int score = this.calculateRecordScore(ruleIdsSuccess);
123
		Map<String,String> recordValidationResult = new HashMap<String,String>();
124
		recordValidationResult.put("score", Integer.toString(score));
125
		recordValidationResult.put("id", tasks.get(0).valobjId);
126
		if (score > 0)
127
			recordValidationResult.put("status", "pass");
128
		else
129
			recordValidationResult.put("status", "fail");
130 19954 nikon.gasp
131
		recordContext.put("veloList", (List<Map<String, String>>) veloList);
132
		recordContext.put("recordValidationResult", (Map<String,String>) recordValidationResult);
133
		recordContext.put("score", (int) score);
134
135 31326 nikon.gasp
136 31252 nikon.gasp
		// WORKFLOWS END
137 34342 nikon.gasp
		if (objsValidated % 10 == 0)
138
			jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
139 17875 nikon.gasp
		objsValidated++;
140 17673 nikon.gasp
	}
141
142 19954 nikon.gasp
	@Override
143
	@Transactional(propagation = Propagation.REQUIRED)
144
	public synchronized void finished(int jobId, Map<String,Object> jobContext) {
145
		internalJobsFinished++;
146
		if(internalJobsFinished == internalJobsSum) {
147 31252 nikon.gasp
			logger.debug("JOBID:"+ this.jobSubmittedId +"# Set job finished");
148 34342 nikon.gasp
			tasksDao.saveTasks(this.scoreMapPerGroupBy.get("all"));
149 19954 nikon.gasp
			jobContext.put("jobSubmittedId", (int) jobSubmittedId);
150 34342 nikon.gasp
			jobContext.put("jobSubmittedUser", (String) jobSubmittedUser);
151
152
			score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy, null, false, objsValidated, validationType);
153
			logger.debug("score_"+validationType +": " + score);
154
			jobContext.put("score_"+validationType, (int) score);
155
			if (jobSubmittedUser.contains("Workflow")) {
156
				jobsDao.setTotalJobFinished(jobId, null, false);
157
			}
158 19954 nikon.gasp
		} else {
159
			logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
160
		}
161
	}
162
163
	@Override
164
	@Transactional(propagation = Propagation.REQUIRED)
165
	public synchronized void failed(int jobId, Map<String,Object> jobContext, Throwable t) {
166
167
		internalJobsFinished++;
168
		if(internalJobsFinished == internalJobsSum) {
169
			logger.debug("JOBID:"+jobSubmittedId+"# Set job finished-failed");
170
			jobContext.put("jobSubmittedId", (int) jobSubmittedId);
171
			jobContext.put("jobSubmittedUser",(String) jobSubmittedUser);
172 34342 nikon.gasp
			score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy,t.getMessage(), true, objsValidated, validationType);
173
			jobContext.put("score_"+validationType, (int) 0);
174
175
			if (jobSubmittedUser.contains("Workflow")) {
176
				jobsDao.setTotalJobFinished(jobId, t.getMessage(), true);
177
			}
178 19954 nikon.gasp
		} else {
179
			logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
180
		}
181
	}
182
183 31326 nikon.gasp
	private synchronized void updateScoreMap(List<String> groupBy_values, List<TaskStored> tasksStored) {
184
		for (String groupBy : groupBy_values) {
185
			logger.debug("updating map for groupBy value: " + groupBy);
186 34342 nikon.gasp
			Map<Integer,RuleStatus> scoreMapPerRule = null;
187 31326 nikon.gasp
			if((scoreMapPerRule=this.scoreMapPerGroupBy.get(groupBy)) == null) {
188
				logger.debug("map for groupBy value: " + groupBy + " doesn't exist");
189 34342 nikon.gasp
				scoreMapPerRule = new HashMap<Integer, RuleStatus>();
190 31326 nikon.gasp
			} else {
191
				logger.debug("map for groupBy value: " + groupBy + " exists");
192
			}
193
			for (TaskStored task : tasksStored) {
194
				RuleStatus ruleSt = null;
195
				if((ruleSt=scoreMapPerRule.get(task.getRuleId())) == null) {
196
					logger.debug("ruleStatus for rule with id : " + task.getRuleId() + " doesn't exist");
197
					ruleSt = new RuleStatus();
198
				}
199 34342 nikon.gasp
				Rule rule = null;
200 31326 nikon.gasp
				if((rule=ruleCacheMap.get(task.getRuleId())) == null) {
201 34342 nikon.gasp
					rule = rulesDao.get(task.getRuleId());
202 31326 nikon.gasp
					ruleCacheMap.put(task.getRuleId(), rule);
203
				}
204 34781 nikon.gasp
				ruleSt.setMandatory(rule.isMandatory());
205
				ruleSt.setWeight(rule.getWeight());
206 31326 nikon.gasp
				ruleSt.total++;
207
				if (task.getSuccess())
208
					ruleSt.success++;
209
				else {
210
					if (groupBy.equals("all") && ruleSt.getFailedTasks().size() < 10 )
211
						ruleSt.getFailedTasks().add(task);
212
				}
213
				scoreMapPerRule.put(task.getRuleId(), ruleSt);
214
			}
215
			this.scoreMapPerGroupBy.put(groupBy, scoreMapPerRule);
216
		}
217
	}
218
219 19683 nikon.gasp
	private int calculateRecordScore(Map<Integer,Boolean> ruleIdsSuccess) {
220
		float score = 0;
221
		float sum = 0;
222
		float weights = 0 ;
223
		for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
224 31252 nikon.gasp
//			RuleStored rule = ruleStoredDao.get(entry.getKey().toString());
225 34342 nikon.gasp
			Rule rule = ruleCacheMap.get(entry.getKey());
226 34781 nikon.gasp
			if (rule.isMandatory()) {
227
				weights += rule.getWeight();
228 19683 nikon.gasp
				if (entry.getValue()) {
229 34781 nikon.gasp
					sum += rule.getWeight();
230 19683 nikon.gasp
				}
231
			}
232
			score = (sum/weights)*100;
233
234
		}
235
		return (int) score;
236
	}
237
238 31326 nikon.gasp
	private synchronized List<String> parseGroupByValues(Object record, String xpath) {
239 18311 nikon.gasp
		List<String> groupBy_values = null;
240
		logger.debug("groupBy field: "+xpath);
241 18252 nikon.gasp
		try{
242
			XPathFactory factory = XPathFactory.newInstance();
243
	        XPath xPath = factory.newXPath();
244 18311 nikon.gasp
	        NodeList rtl = (NodeList) xPath.evaluate(xpath+"/text()",record ,XPathConstants.NODESET);
245 18315 nikon.gasp
	        groupBy_values = new ArrayList<String>();
246 18252 nikon.gasp
	        if (rtl.getLength() > 0) {
247 18315 nikon.gasp
	        	for(int i=0; i<rtl.getLength(); i++) {
248 18252 nikon.gasp
	        		Node childNode = rtl.item(i);
249 18311 nikon.gasp
	        		groupBy_values.add(childNode.getNodeValue());
250
	        		logger.debug("value: "+childNode.getNodeValue());
251 18252 nikon.gasp
	        	}
252 31252 nikon.gasp
	        	groupBy_values.add("all");
253 18315 nikon.gasp
	        } else {
254
	        	groupBy_values.add("all");
255 18252 nikon.gasp
	        }
256 18315 nikon.gasp
257 18252 nikon.gasp
	 	} catch (XPathExpressionException e) {
258
			logger.error("error getting object"+ e);
259
	 	}
260 18311 nikon.gasp
		return groupBy_values;
261 18252 nikon.gasp
	}
262 18398 nikon.gasp
263 17710 nikon.gasp
	public Integer getJobSubmittedId() {
264
		return jobSubmittedId;
265
	}
266 17673 nikon.gasp
267 17710 nikon.gasp
	public void setJobSubmittedId(Integer jobSubmittedId) {
268
		this.jobSubmittedId = jobSubmittedId;
269 17673 nikon.gasp
	}
270
271 18398 nikon.gasp
	public String getJobSubmittedUser() {
272
		return jobSubmittedUser;
273
	}
274
275
	public void setJobSubmittedUser(String jobSubmittedUser) {
276
		this.jobSubmittedUser = jobSubmittedUser;
277
	}
278
279 34342 nikon.gasp
	public String getGroupBy_xpath() {
280
		return groupBy_xpath;
281 17710 nikon.gasp
	}
282 17673 nikon.gasp
283 34342 nikon.gasp
	public void setGroupBy_xpath(String groupBy_xpath) {
284
		this.groupBy_xpath = groupBy_xpath;
285 17673 nikon.gasp
	}
286
287 34342 nikon.gasp
	public String getValBaseUrl() {
288
		return valBaseUrl;
289 17710 nikon.gasp
	}
290 17673 nikon.gasp
291 34342 nikon.gasp
	public void setValBaseUrl(String valBaseUrl) {
292
		this.valBaseUrl = valBaseUrl;
293 17673 nikon.gasp
	}
294
295 34342 nikon.gasp
	public int getInternalJobsSum() {
296
		return internalJobsSum;
297 17875 nikon.gasp
	}
298
299 34342 nikon.gasp
	public void setInternalJobsSum(int internalJobsSum) {
300
		this.internalJobsSum = internalJobsSum;
301 17875 nikon.gasp
	}
302
303 34342 nikon.gasp
304
	public Map<String, Map<Integer, RuleStatus>> getScoreMapPerGroupBy() {
305
		return scoreMapPerGroupBy;
306 17673 nikon.gasp
	}
307
308 34342 nikon.gasp
	public void setScoreMapPerGroupBy(
309
			Map<String, Map<Integer, RuleStatus>> scoreMapPerGroupBy) {
310
		this.scoreMapPerGroupBy = scoreMapPerGroupBy;
311 17673 nikon.gasp
	}
312 18311 nikon.gasp
313 34342 nikon.gasp
	public TasksDAO getTasksDao() {
314
		return tasksDao;
315 18311 nikon.gasp
	}
316
317 34342 nikon.gasp
	public void setTasksDao(TasksDAO tasksDao) {
318
		this.tasksDao = tasksDao;
319 18311 nikon.gasp
	}
320
321 34342 nikon.gasp
	public JobsDAO getJobsDao() {
322
		return jobsDao;
323 18483 nikon.gasp
	}
324
325 34342 nikon.gasp
	public void setJobsDao(JobsDAO jobsDao) {
326
		this.jobsDao = jobsDao;
327 18483 nikon.gasp
	}
328
329 34342 nikon.gasp
	public RulesDAO getRulesDao() {
330
		return rulesDao;
331 18483 nikon.gasp
	}
332
333 34342 nikon.gasp
	public void setRulesDao(RulesDAO rulesDao) {
334
		this.rulesDao = rulesDao;
335 18483 nikon.gasp
	}
336 31252 nikon.gasp
337 34342 nikon.gasp
	public Map<Integer, Rule> getRuleCacheMap() {
338 31252 nikon.gasp
		return ruleCacheMap;
339
	}
340
341 34342 nikon.gasp
	public void setRuleCacheMap(Map<Integer, Rule> ruleCacheMap) {
342 31252 nikon.gasp
		this.ruleCacheMap = ruleCacheMap;
343
	}
344 34342 nikon.gasp
345
	public String getValidationType() {
346
		return validationType;
347
	}
348
349
	public void setValidationType(String validationType) {
350
		this.validationType = validationType;
351
	}
352 18252 nikon.gasp
}