Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.util.ArrayList;
4
import java.util.HashMap;
5
import java.util.List;
6
import java.util.Map;
7

    
8
import javax.xml.xpath.XPath;
9
import javax.xml.xpath.XPathConstants;
10
import javax.xml.xpath.XPathExpressionException;
11
import javax.xml.xpath.XPathFactory;
12

    
13
import org.apache.log4j.Logger;
14
import org.springframework.transaction.annotation.Propagation;
15
import org.springframework.transaction.annotation.Transactional;
16
import org.w3c.dom.Node;
17
import org.w3c.dom.NodeList;
18

    
19
import eu.dnetlib.domain.functionality.validator.Rule;
20
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
21
import eu.dnetlib.validator.commons.dao.rules.RuleStatus;
22
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
23
import eu.dnetlib.validator.commons.dao.tasks.TaskStored;
24
import eu.dnetlib.validator.commons.dao.tasks.TasksDAO;
25
import eu.dnetlib.validator.engine.execution.CompletedTask;
26
import eu.dnetlib.validator.engine.execution.JobListener;
27

    
28
/**
29
 * 
30
 * @author Nikon Gasparis
31
 *
32
 */
33
public class ValidatorJobListener implements JobListener {
34
	private static Logger logger = Logger.getLogger(ValidatorJobListener.class);
35

    
36
	private Integer jobSubmittedId;
37
	private String jobSubmittedUser;
38
	private TasksDAO tasksDao;
39
	private JobsDAO jobsDao;
40
	private RulesDAO rulesDao;
41
	private String validationType;
42
	
43
	private int objsValidated = 0;
44
	private int score = 0;
45
	private int internalJobsFinished = 0;
46
	
47
	Map<String,Map<Integer,RuleStatus>> scoreMapPerGroupBy = new HashMap<String,Map<Integer,RuleStatus>>();
48
	private String valBaseUrl = null;
49
	
50
	private int internalJobsSum = 1;
51

    
52

    
53
	private Map<Integer, Rule> ruleCacheMap = new HashMap<Integer, Rule>();
54
	private String groupBy_xpath = null;
55

    
56
	@Override
57
	@Transactional(propagation = Propagation.REQUIRED)
58
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext, Throwable t) {
59
		logger.error("Completed tasks for " + jobId + " (exception). Error: " + t.getMessage());
60
		List<TaskStored> tasksStored = new ArrayList<TaskStored>();
61
		List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
62
		for (CompletedTask ctask : tasks) {
63
			TaskStored taskStored = new TaskStored();
64
			taskStored.setEnded(ctask.finished.toString());
65
			taskStored.setStarted(ctask.started.toString());
66
			taskStored.setJobId(jobSubmittedId);
67
			taskStored.setRecordIdentifier(ctask.valobjId);
68
			taskStored.setRuleId(ctask.ruleId);
69
			taskStored.setSuccess(ctask.success);
70
			taskStored.setStatus("finished");
71
			tasksStored.add(taskStored);
72
			
73
			logger.debug("JOBID:"+jobSubmittedId+"# Task-failed: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
74
					" ruleId: "+taskStored.getRuleId() + " error:"+ t.getMessage());
75
		}
76
		this.updateScoreMap(groupBy_values, tasksStored);
77
//		taskStoredDao.saveTasks(tasksStored,groupBy_values);
78
		if (objsValidated % 100 == 0)
79
			jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
80
		objsValidated++;		
81
	}
82

    
83
	@Override
84
	@Transactional(propagation = Propagation.REQUIRED)
85
	public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String,Object> recordContext) {
86
			
87
		List<Map<String, String>> veloList = new ArrayList<Map<String, String>>();
88
			
89
		logger.debug("JOBID:"+jobSubmittedId+"# Updating Completed Tasks");
90
		Map<Integer,Boolean> ruleIdsSuccess = new HashMap<Integer,Boolean>();
91
		Map<String, String> veloMap = null;
92
		List<TaskStored> tasksStored = new ArrayList<TaskStored>();
93
		List<String> groupBy_values = this.parseGroupByValues(record,groupBy_xpath);
94
		
95
		for (CompletedTask ctask : tasks) {
96
			veloMap = new HashMap<String, String>();
97
			TaskStored taskStored = new TaskStored();
98
			taskStored.setEnded(ctask.finished.toString());
99
			taskStored.setStarted(ctask.started.toString());
100
			taskStored.setJobId(jobSubmittedId);
101
			taskStored.setRecordIdentifier(ctask.valobjId);
102
			taskStored.setRuleId(ctask.ruleId);
103
			taskStored.setSuccess(ctask.success);
104
			taskStored.setStatus("finished");
105
			tasksStored.add(taskStored);
106
			
107
//			logger.debug("JOBID:"+jobSubmittedId+"# Task: rule " + ctask.ruleId + " on " + ctask.valobjId + " with success " + ctask.success+
108
//					" ruleId: "+taskStored.getRuleId());
109
			ruleIdsSuccess.put(taskStored.getRuleId(),ctask.success);
110
			
111
//			logger.debug("inserted on map: "+ "id: " + taskStored.getRuleId());
112
			if (!ctask.success) {
113
				veloMap.put("id", Integer.toString(taskStored.getRuleId()));
114
				veloMap.put("error", "...");
115
			    veloList.add(veloMap);
116
			}
117
				
118
		}
119
		this.updateScoreMap(groupBy_values, tasksStored);
120

    
121
		// WORKFLOWS BEGIN
122
		int score = this.calculateRecordScore(ruleIdsSuccess);
123
		Map<String,String> recordValidationResult = new HashMap<String,String>();
124
		recordValidationResult.put("score", Integer.toString(score));
125
		recordValidationResult.put("id", tasks.get(0).valobjId);
126
		if (score > 0)
127
			recordValidationResult.put("status", "pass");
128
		else
129
			recordValidationResult.put("status", "fail");
130
		
131
		recordContext.put("veloList", (List<Map<String, String>>) veloList);
132
		recordContext.put("recordValidationResult", (Map<String,String>) recordValidationResult);
133
		recordContext.put("score", (int) score);
134
		
135
		
136
		// WORKFLOWS END
137
		if (objsValidated % 10 == 0)
138
			jobsDao.setStatus(jobSubmittedId, "ongoing", objsValidated, validationType);
139
		objsValidated++;
140
	}
141

    
142
	@Override
143
	@Transactional(propagation = Propagation.REQUIRED)
144
	public synchronized void finished(int jobId, Map<String,Object> jobContext) {
145
		internalJobsFinished++;
146
		if(internalJobsFinished == internalJobsSum) { 
147
			logger.debug("JOBID:"+ this.jobSubmittedId +"# Set job finished");
148
			tasksDao.saveTasks(this.scoreMapPerGroupBy.get("all"));
149
			jobContext.put("jobSubmittedId", (int) jobSubmittedId);
150
			jobContext.put("jobSubmittedUser", (String) jobSubmittedUser);
151

    
152
			score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy, null, false, objsValidated, validationType);
153
			logger.debug("score_"+validationType +": " + score);
154
			jobContext.put("score_"+validationType, (int) score);
155
			if (jobSubmittedUser.contains("Workflow")) {
156
				jobsDao.setTotalJobFinished(jobId, null, false);
157
			}
158
		} else {
159
			logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
160
		}
161
	}
162
	
163
	@Override
164
	@Transactional(propagation = Propagation.REQUIRED)
165
	public synchronized void failed(int jobId, Map<String,Object> jobContext, Throwable t) {
166
		
167
		internalJobsFinished++;
168
		if(internalJobsFinished == internalJobsSum) { 		
169
			logger.debug("JOBID:"+jobSubmittedId+"# Set job finished-failed");
170
			jobContext.put("jobSubmittedId", (int) jobSubmittedId);
171
			jobContext.put("jobSubmittedUser",(String) jobSubmittedUser);
172
			score = jobsDao.setJobFinished(jobSubmittedId, this.scoreMapPerGroupBy,t.getMessage(), true, objsValidated, validationType);
173
			jobContext.put("score_"+validationType, (int) 0);
174
			
175
			if (jobSubmittedUser.contains("Workflow")) {
176
				jobsDao.setTotalJobFinished(jobId, t.getMessage(), true);
177
			}
178
		} else {
179
			logger.debug("JOBID:"+jobSubmittedId+"#Job finished. Waiting "+ (internalJobsSum-internalJobsFinished) + " job(s) to finish" );
180
		}
181
	}
182
			
183
	private synchronized void updateScoreMap(List<String> groupBy_values, List<TaskStored> tasksStored) {
184
		for (String groupBy : groupBy_values) {
185
			logger.debug("updating map for groupBy value: " + groupBy);
186
			Map<Integer,RuleStatus> scoreMapPerRule = null;
187
			if((scoreMapPerRule=this.scoreMapPerGroupBy.get(groupBy)) == null) {
188
				logger.debug("map for groupBy value: " + groupBy + " doesn't exist");
189
				scoreMapPerRule = new HashMap<Integer, RuleStatus>();
190
			} else {
191
				logger.debug("map for groupBy value: " + groupBy + " exists");
192
			}
193
			for (TaskStored task : tasksStored) {
194
				RuleStatus ruleSt = null;
195
				if((ruleSt=scoreMapPerRule.get(task.getRuleId())) == null) {
196
					logger.debug("ruleStatus for rule with id : " + task.getRuleId() + " doesn't exist");
197
					ruleSt = new RuleStatus();
198
				} 
199
				Rule rule = null;
200
				if((rule=ruleCacheMap.get(task.getRuleId())) == null) {
201
					rule = rulesDao.get(task.getRuleId());
202
					ruleCacheMap.put(task.getRuleId(), rule);
203
				}
204
				ruleSt.setMandatory(rule.isMandatory());
205
				ruleSt.setWeight(rule.getWeight());
206
				ruleSt.total++;
207
				if (task.getSuccess())
208
					ruleSt.success++;
209
				else {
210
					if (groupBy.equals("all") && ruleSt.getFailedTasks().size() < 10 )
211
						ruleSt.getFailedTasks().add(task);
212
				}
213
				scoreMapPerRule.put(task.getRuleId(), ruleSt);
214
			}
215
			this.scoreMapPerGroupBy.put(groupBy, scoreMapPerRule);
216
		}
217
	} 
218
	
219
	private int calculateRecordScore(Map<Integer,Boolean> ruleIdsSuccess) {
220
		float score = 0;
221
		float sum = 0;
222
		float weights = 0 ;
223
		for (Map.Entry<Integer,Boolean> entry : ruleIdsSuccess.entrySet()) {
224
//			RuleStored rule = ruleStoredDao.get(entry.getKey().toString());
225
			Rule rule = ruleCacheMap.get(entry.getKey());
226
			if (rule.isMandatory()) {
227
				weights += rule.getWeight();
228
				if (entry.getValue()) {
229
					sum += rule.getWeight();
230
				}
231
			}
232
			score = (sum/weights)*100;
233
			
234
		}
235
		return (int) score;
236
	}
237

    
238
	private synchronized List<String> parseGroupByValues(Object record, String xpath) {
239
		List<String> groupBy_values = null;
240
		logger.debug("groupBy field: "+xpath);
241
		try{
242
			XPathFactory factory = XPathFactory.newInstance();
243
	        XPath xPath = factory.newXPath();
244
	        NodeList rtl = (NodeList) xPath.evaluate(xpath+"/text()",record ,XPathConstants.NODESET);
245
	        groupBy_values = new ArrayList<String>();
246
	        if (rtl.getLength() > 0) {
247
	        	for(int i=0; i<rtl.getLength(); i++) {
248
	        		Node childNode = rtl.item(i);
249
	        		groupBy_values.add(childNode.getNodeValue());
250
	        		logger.debug("value: "+childNode.getNodeValue());
251
	        	}
252
	        	groupBy_values.add("all");
253
	        } else {
254
	        	groupBy_values.add("all");
255
	        }
256
	        	
257
	 	} catch (XPathExpressionException e) {
258
			logger.error("error getting object"+ e);
259
	 	}
260
		return groupBy_values;
261
	}
262
	
263
	public Integer getJobSubmittedId() {
264
		return jobSubmittedId;
265
	}
266

    
267
	public void setJobSubmittedId(Integer jobSubmittedId) {
268
		this.jobSubmittedId = jobSubmittedId;
269
	}
270

    
271
	public String getJobSubmittedUser() {
272
		return jobSubmittedUser;
273
	}
274

    
275
	public void setJobSubmittedUser(String jobSubmittedUser) {
276
		this.jobSubmittedUser = jobSubmittedUser;
277
	}
278

    
279
	public String getGroupBy_xpath() {
280
		return groupBy_xpath;
281
	}
282

    
283
	public void setGroupBy_xpath(String groupBy_xpath) {
284
		this.groupBy_xpath = groupBy_xpath;
285
	}
286

    
287
	public String getValBaseUrl() {
288
		return valBaseUrl;
289
	}
290

    
291
	public void setValBaseUrl(String valBaseUrl) {
292
		this.valBaseUrl = valBaseUrl;
293
	}
294

    
295
	public int getInternalJobsSum() {
296
		return internalJobsSum;
297
	}
298

    
299
	public void setInternalJobsSum(int internalJobsSum) {
300
		this.internalJobsSum = internalJobsSum;
301
	}
302

    
303

    
304
	public Map<String, Map<Integer, RuleStatus>> getScoreMapPerGroupBy() {
305
		return scoreMapPerGroupBy;
306
	}
307

    
308
	public void setScoreMapPerGroupBy(
309
			Map<String, Map<Integer, RuleStatus>> scoreMapPerGroupBy) {
310
		this.scoreMapPerGroupBy = scoreMapPerGroupBy;
311
	}
312

    
313
	public TasksDAO getTasksDao() {
314
		return tasksDao;
315
	}
316

    
317
	public void setTasksDao(TasksDAO tasksDao) {
318
		this.tasksDao = tasksDao;
319
	}
320

    
321
	public JobsDAO getJobsDao() {
322
		return jobsDao;
323
	}
324

    
325
	public void setJobsDao(JobsDAO jobsDao) {
326
		this.jobsDao = jobsDao;
327
	}
328

    
329
	public RulesDAO getRulesDao() {
330
		return rulesDao;
331
	}
332

    
333
	public void setRulesDao(RulesDAO rulesDao) {
334
		this.rulesDao = rulesDao;
335
	}
336

    
337
	public Map<Integer, Rule> getRuleCacheMap() {
338
		return ruleCacheMap;
339
	}
340

    
341
	public void setRuleCacheMap(Map<Integer, Rule> ruleCacheMap) {
342
		this.ruleCacheMap = ruleCacheMap;
343
	}
344

    
345
	public String getValidationType() {
346
		return validationType;
347
	}
348

    
349
	public void setValidationType(String validationType) {
350
		this.validationType = validationType;
351
	}
352
}
353

    
(7-7/7)