Project

General

Profile

« Previous | Next » 

Revision 47446

moving to dnet45

View differences:

modules/uoa-validator-service/trunk/deploy.info
1
{
2
  "type_source": "SVN", 
3
  "goal": "package -U source:jar", 
4
  "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/uoa-validator-service/trunk", 
5
  "deploy_repository": "dnet4-snapshots", 
6
  "version": "4", 
7
  "mail": "antleb@di.uoa.gr, kiatrop@di.uoa.gr, nikonas@di.uoa.gr", 
8
  "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", 
9
  "name": "uoa-validator-service"
10
}
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impl/ValidatorManager.java
1
package eu.dnetlib.validator.service.impl;
2

  
3
import eu.dnetlib.domain.functionality.validator.JobForValidation;
4
import eu.dnetlib.domain.functionality.validator.RuleSet;
5
import eu.dnetlib.domain.functionality.validator.StoredJob;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
8
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
9
import eu.dnetlib.validator.engine.ValidatorException;
10

  
11
import java.util.List;
12

  
13
public interface ValidatorManager {
14

  
15
    StoredJob beginDataJobForWorkflow(String datasource, String guidelines, String groupBy, int records, int workers, BlackboardJob bJob, BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler, int jobStatusUpdateInterval, boolean outputEpr, boolean blacklistedRecords, String blacklistGuidelines) throws ValidatorException;
16

  
17
    StoredJob getStoredJob(int jobId, String groupBy) throws ValidatorException;
18

  
19
    List<StoredJob> getStoredJobs(String userMail, String jobType,
20
                                         Integer offset, Integer limit, String dateFrom, String dateTo) throws ValidatorException;
21

  
22
    List<StoredJob> getStoredJobs(String userMail, String jobType,
23
                                  Integer offset, Integer limit, String dateFrom, String dateTo, String jobStatus) throws ValidatorException;
24

  
25
    List<RuleSet> getRuleSets() throws ValidatorException;
26

  
27
    void submitJob(JobForValidation job) throws ValidatorException;
28

  
29
    int getStoredJobsTotalNumber(String userMail, String jobType) throws ValidatorException;
30

  
31
    int getStoredJobsTotalNumber(String userMail, String jobType, String jobStatus) throws ValidatorException;
32

  
33
}
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impl/ValidatorManagerImpl.java
1
package eu.dnetlib.validator.service.impl;
2

  
3
import eu.dnetlib.domain.functionality.validator.CustomProperties;
4
import eu.dnetlib.domain.functionality.validator.JobForValidation;
5
import eu.dnetlib.domain.functionality.validator.RuleSet;
6
import eu.dnetlib.domain.functionality.validator.StoredJob;
7
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
8
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
9
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
10
import eu.dnetlib.validator.commons.dao.DaoException;
11
import eu.dnetlib.validator.commons.dao.jobs.JobsDAO;
12
import eu.dnetlib.validator.commons.dao.rules.RulesDAO;
13
import eu.dnetlib.validator.commons.dao.rules.RulesetsDAO;
14
import eu.dnetlib.validator.engine.Validator;
15
import eu.dnetlib.validator.engine.ValidatorException;
16
import eu.dnetlib.validator.engine.data.Rule;
17
import eu.dnetlib.validator.engine.execution.Job;
18
import eu.dnetlib.validator.service.impls.ValidatorRestore;
19
import eu.dnetlib.validator.service.impls.listeners.*;
20
import eu.dnetlib.validator.service.impls.providers.DnetProvider;
21
import eu.dnetlib.validator.service.impls.providers.OAIPMHRecordProvider;
22
import eu.dnetlib.validator.service.impls.providers.OAIPMHSinglePageVerbProvider;
23
import eu.dnetlib.validator.service.impls.rules.ChainRule;
24
import eu.dnetlib.validator.service.impls.rules.RuleTypes;
25
import org.apache.log4j.Logger;
26
import org.springframework.transaction.annotation.Propagation;
27
import org.springframework.transaction.annotation.Transactional;
28

  
29
import java.util.*;
30

  
31
@Transactional
32
public class ValidatorManagerImpl implements ValidatorManager {
33

  
34
	private Validator validator;
35
	private JobsDAO jobsDao;
36
	private RulesetsDAO rulesetsDao;
37
	private RulesDAO rulesDao;
38
	private ValidatorRestore valRestore;
39
	
40
	
41
	private static Logger logger = Logger.getLogger(ValidatorManagerImpl.class);
42

  
43
	private ListenersManager listenersManager;
44
//	this.restartJobs();
45
	public void start() {
46
		logger.info("Initializing Validator Manager module");
47
		if (valRestore.isAutoRestoreOnStartup()) {
48
			logger.info("auto-restoring OpenAire Validator is enabled..");
49
			List<StoredJob> jobs = valRestore.deleteUncompleted();
50
			valRestore.restartJobs(jobs);
51
			logger.info("finished restoring OpenAire Validator..");
52
		} else {
53
			logger.info("auto-restoring OpenAire Validator is disabled..");
54
		}
55
	}
56
	
57
	public StoredJob getStoredJob(int jobId, String groupBy)
58
			throws ValidatorException {
59
		try {
60
			logger.info("Getting job summary for job " + jobId +" with groupBy: "+groupBy);
61
			return jobsDao.getJobSummary(jobId,groupBy);
62
		} catch (Exception e) {
63
			logger.error("error getting job summary for job " + jobId, e);
64
			throw new ValidatorException(e);
65
		}
66
	}
67
	
68
	public List<StoredJob> getStoredJobs(String userMail, String jobType,
69
			Integer offset, Integer limit, String dateFrom, String dateTo)
70
			throws ValidatorException {
71
		return this.getStoredJobs(userMail, jobType, offset, limit, dateFrom, dateTo, null);
72

  
73
	}
74

  
75
	public List<StoredJob> getStoredJobs(String userMail, String jobType,
76
										 Integer offset, Integer limit, String dateFrom, String dateTo, String jobStatus)
77
			throws ValidatorException {
78
		try {
79
			logger.debug("Getting jobs of user " + userMail);
80
			return jobsDao.getJobs(userMail, jobType, offset, limit, dateFrom, dateTo, jobStatus);
81
		} catch (Exception e) {
82
			logger.error("error Getting jobs of user " + userMail, e);
83
			throw new ValidatorException(e);
84
		}
85
	}
86

  
87
	public int getStoredJobsTotalNumber(String userMail, String jobType)
88
			throws ValidatorException {
89
		return this.getStoredJobsTotalNumber(userMail, jobType, null);
90
	}
91

  
92
	public int getStoredJobsTotalNumber(String userMail, String jobType, String jobStatus)
93
			throws ValidatorException {
94
		try {
95
			logger.debug("Getting jobs total sum of user " + userMail);
96
			return jobsDao.getJobsTotalNumber(userMail, jobType, jobStatus);
97
		} catch (Exception e) {
98
			logger.error("error Getting jobs sum of user " + userMail, e);
99
			throw new ValidatorException(e);
100
		}
101
	}
102
	
103
	public List<RuleSet> getRuleSets() throws ValidatorException {
104
		try {
105
			logger.info("Getting rulesets");
106
			return rulesetsDao.getRuleSets();
107
		} catch (Exception e) {
108
			logger.error("error Getting rulesets ", e);
109
			throw new ValidatorException(e);
110
		}
111
	}
112

  
113
	@Transactional(readOnly = false, propagation = Propagation.REQUIRES_NEW)
114
	public StoredJob beginDataJobForWorkflow(String mdstoreId, String guidelines, String groupBy, int records, int workers, BlackboardJob bJob, BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler, int jobStatusUpdateInterval, boolean outputEpr, boolean blacklistedRecords, String blacklistGuidelines) throws ValidatorException{
115
		try {
116
			logger.info("Submitting data job for workflow");
117
//			String desiredCompatibility = "openaire3.0";
118
			Set<Integer> ruleIdsForBlacklist = new HashSet<Integer>();
119
			Set<Integer> ruleIdsContent = new HashSet<Integer>();
120
			for (RuleSet ruleset : rulesetsDao.getRuleSets()) {
121
				if (ruleset.getGuidelinesAcronym().equals(guidelines)) {
122
					ruleIdsContent = ruleset.getContentRulesIds();
123
				}	
124
				if (blacklistedRecords && ruleset.getGuidelinesAcronym().equals(blacklistGuidelines)) {
125
					ruleIdsForBlacklist = ruleset.getContentRulesIds();
126
					ruleIdsContent.addAll(ruleIdsForBlacklist);
127
				}
128
			}
129
			
130
			Properties props = new Properties();
131

  
132
			props.setProperty(DnetProvider.MDSTORE_ID, mdstoreId);
133
			props.setProperty(DnetProvider.BATCH_SIZE, "50");
134
			if (bJob.getParameters().get("batchSize") != null )
135
				props.setProperty(DnetProvider.BATCH_SIZE, bJob.getParameters().get("batchSize"));
136
			
137
			props.setProperty(DnetProvider.RECORDS, Integer.toString(records));
138
			
139
			StoredJob newJob = new StoredJob();
140
			newJob.setUserEmail("Workflow Service");
141
			newJob.setValidationType("C");
142
			newJob.setDesiredCompatibilityLevel(guidelines);
143
			newJob.setContentJobStatus("ongoing");
144
			newJob.setUsageJobStatus("none");
145
			newJob.setJobType("Workflow Request");
146
			newJob.setDuration("--");
147
			newJob.setBaseUrl(mdstoreId);
148
			newJob.setRules(ruleIdsContent);
149
			newJob.setRecords(records);
150
			newJob.setValidationSet("dnet-workflow");
151
			newJob.setGroupByXpath(groupBy);
152
			newJob.setMetadataPrefix("oai_dc");
153
			newJob.setId(-1);
154
	
155
			int jobIdStored = jobsDao.save(newJob);
156
					
157
			Set<Rule> rulesContent = new HashSet<Rule>();
158
			
159
			logger.debug("Selected content rules number: " + ruleIdsContent.size());
160
			for (Integer ruleId : ruleIdsContent){
161
				
162
				eu.dnetlib.domain.functionality.validator.Rule tempRule=rulesDao.get(ruleId);
163
				
164
				//special behaviour type of rule is chain
165
				if(tempRule.getType().equals("ChainRule")) {
166
					ChainRule<Rule> chainRule = this.handleChain(tempRule);
167
					rulesContent.add(chainRule);
168
				}
169
				else {
170
					rulesContent.add((Rule) this.getRuleClassInstanceByType(tempRule.getType(), tempRule.getConfiguration(), tempRule.getId()));
171
				}
172
			}
173
			
174
			Job jobContent = new Job(jobIdStored, 3, rulesContent, props);
175
			
176
			ValidatorJobListener listenerContent = listenersManager.createListener();
177
			listenerContent.setJobSubmittedId(jobIdStored);
178
			listenerContent.setJobSubmittedUser("Workflow Service");
179
			listenerContent.setGroupBy_xpath(groupBy);
180
			listenerContent.setValidationType("content");
181
			listenerContent.setInternalJobsSum(workers);
182
			listenerContent.setBlacklistRuleIds(ruleIdsForBlacklist);
183

  
184
			DnetListener dnetListener = listenersManager.createDnetListener();
185
			dnetListener.setJob(bJob);
186
			dnetListener.setBlackboardHandler(blackboardHandler);
187
			dnetListener.setInternalJobsSum(workers);
188
			dnetListener.setValidationJobId(jobIdStored);
189
			dnetListener.setJobStatusUpdateInterval(jobStatusUpdateInterval);
190
			if (blacklistedRecords) {
191
				dnetListener.setEnableOutputToDisk(true);
192
				
193
			}
194
			if (outputEpr)
195
				dnetListener.setEnableOutputToRS(true);
196

  
197
			validator.submitJob(jobContent, workers, listenerContent, dnetListener);
198
				
199
			return newJob;
200
			
201
			
202
		} catch (Exception e) {
203
			logger.error("Error Submitting content job", e);
204
			throw new ValidatorException(e);
205
		}
206
	}
207
	
208
	@Transactional(readOnly = false, propagation = Propagation.REQUIRED)
209
	public void submitJob(JobForValidation job) throws ValidatorException {
210

  
211
		try {
212
			logger.info("Submiting validation job requested by user: " + job.getUserEmail());
213

  
214
			if (job.isRegistration()) {
215
				logger.debug("initiating preregistration validations on repo " + job.getBaseUrl() + " for user " + job.getUserEmail() + "and desired compatibility: " + job.getDesiredCompatibilityLevel());
216
				for (RuleSet ruleset : rulesetsDao.getRuleSets()) {
217
					if (ruleset.getGuidelinesAcronym().equals(job.getDesiredCompatibilityLevel())) {
218
						job.setSelectedContentRules(ruleset.getContentRulesIds());
219
						job.setSelectedUsageRules(ruleset.getUsageRulesIds());
220
					}	
221
				}	
222
			}
223
			
224
			if ( job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire2.0") || 
225
					job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire3.0") || 
226
					job.getDesiredCompatibilityLevel().equalsIgnoreCase("driver") ) {
227
				logger.debug("Chosen set: OpenAIRE For Literature Repositories");
228
				logger.debug("Setting METADATA_PREFIX to: oai_dc");
229
				job.setMetadataPrefix("oai_dc");
230
			} else if ( job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire2.0_data")) {
231
				logger.debug("Chosen set: OpenAIRE For Data Archives");
232
				logger.debug("Setting METADATA_PREFIX to: oai_datacite");
233
				job.setMetadataPrefix("oai_datacite");			
234
			} else if ( job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire3.0_cris")) {
235
				logger.debug("Chosen set: OpenAIRE For Cris");
236
				logger.debug("Setting METADATA_PREFIX to: oai_CERIF_openaire");
237
				job.setMetadataPrefix("oai_CERIF_openaire");			
238
			}
239
			
240
			
241
			StoredJob newJob = new StoredJob(job);
242

  
243
			int usageJobs = 0;
244
			int contentJobs = 0;
245
			Set<Integer> totalRules = new HashSet<Integer>();
246
			Map<String,Set<Integer>> verbRuleMap = null;
247
			String validationType = "";
248
			
249
			if (job.getSelectedContentRules() != null) {
250
				contentJobs = 1;
251
				validationType +="C";
252
				totalRules.addAll(job.getSelectedContentRules());
253
				newJob.setContentJobStatus("ongoing");
254
			} else {
255
				newJob.setContentJobStatus("none");
256
			}
257
			if (job.getSelectedUsageRules() != null) {
258
				logger.debug("Creating map for provider information");
259
				verbRuleMap= new HashMap<String,Set<Integer>>();
260
				Set<Integer> old,temp = null;
261
				for (Integer id : job.getSelectedUsageRules()){
262
					eu.dnetlib.domain.functionality.validator.Rule ruleStored = rulesDao.get(id);
263
					logger.debug("Checking for verb : "+ruleStored.getProvider_information());
264
			        if((old=verbRuleMap.get(ruleStored.getProvider_information())) == null){
265
			        	logger.debug("Verb doesn't exist");
266
			            temp = new HashSet<Integer>();
267
			            temp.add(ruleStored.getId());
268
			            verbRuleMap.put(ruleStored.getProvider_information(),temp);
269
			        }else{
270
			        	logger.debug("Verb exists");
271
			            old.add(ruleStored.getId());
272
			        }
273
				}
274
				usageJobs = verbRuleMap.size();
275
				validationType +="U";
276
				totalRules.addAll(job.getSelectedUsageRules());
277
				newJob.setUsageJobStatus("ongoing");
278
			} else {
279
				newJob.setUsageJobStatus("none");
280
			}
281
			
282
			
283
			Map<String, Set<Rule>> entityChosenRulesMap = null;
284
			Map<String, Set<Rule>> entityChosenRulesMapReferential = null;
285
			
286
			if (job.isCris()) {
287
				entityChosenRulesMap = new HashMap<String, Set<Rule>>();
288
				entityChosenRulesMapReferential = new HashMap<String, Set<Rule>>();
289
				this.prepareCrisJobs(job, entityChosenRulesMap, entityChosenRulesMapReferential);
290
				newJob.setGroupByXpath("//header/setSpec");
291
				contentJobs = entityChosenRulesMap.keySet().size() + entityChosenRulesMapReferential.keySet().size();
292
				//TODO move in uoa-domain
293
				newJob.setCris(true);
294
				newJob.setSelectedCrisEntities(job.getSelectedCrisEntities());
295
				newJob.setCrisReferentialChecks(job.isCrisReferentialChecks());
296
			}
297
			newJob.setValidationType(validationType);
298
			if (job.isRegistration()) {
299
				newJob.setJobType("Registration Request");
300
				//TODO move in uoa-domain
301
				newJob.setActivationId(job.getActivationId());
302
				newJob.setRegistration(true);
303
				newJob.setUpdateExisting(job.isUpdateExisting());
304
				newJob.setOfficialName(job.getOfficialName());
305
				newJob.setAdminEmails(job.getAdminEmails());
306
				newJob.setDatasourceId(job.getDatasourceId());
307
				newJob.setInterfaceId(job.getInterfaceId());
308
				newJob.setInterfaceIdOld(job.getInterfaceIdOld());
309
				newJob.setRepoType(job.getRepoType());
310
			} else
311
				newJob.setJobType("Compatibility Test");
312
			
313
			newJob.setDuration("--");
314
			newJob.setRules(totalRules);
315
			newJob.setId(-1);
316
	
317
			int jobIdStored = jobsDao.save(newJob);
318
			
319

  
320
			RegistrationListener regListener = null;
321
			if (job.isRegistration()) {
322
				regListener = listenersManager.createRegListener();
323
				regListener.setActivationId(job.getActivationId());
324
				regListener.setBaseUrl(job.getBaseUrl());
325
				regListener.setActivationId(job.getActivationId());
326
				regListener.setAdminEmails(job.getAdminEmails());
327
				regListener.setOfficialName(job.getOfficialName());
328
				regListener.setUserMail(job.getUserEmail());
329
				regListener.setDatasourceId(job.getDatasourceId());
330
				regListener.setInterfaceId(job.getInterfaceId());
331
				regListener.setValidationSet(job.getValidationSet());
332
				regListener.setDesiredCompLevel(job.getDesiredCompatibilityLevel());
333
				regListener.setRepoType(job.getRepoType());
334
				regListener.setInterfaceIdOld(job.getInterfaceIdOld());
335
				regListener.setUpdateExisting(job.isUpdateExisting());	
336
				regListener.setTotalJobs(usageJobs + contentJobs);
337
			}
338
			
339
			CompatibilityTestListener compTestListener = listenersManager.createCompTestListener();
340
			compTestListener.setValidationSet(job.getValidationSet());
341
			compTestListener.setGuidelines(job.getDesiredCompatibilityLevel());
342
			compTestListener.setTotalJobs(usageJobs + contentJobs);
343

  
344
			
345
//CONTENT
346
			if (job.getSelectedContentRules() != null) {
347
			
348
				Set<Rule> rulesContent = new HashSet<Rule>();
349
				
350
				Properties props = new Properties();
351
				props.setProperty(OAIPMHRecordProvider.BASEURL, job.getBaseUrl());
352
				props.setProperty(OAIPMHRecordProvider.METADATA_PREFIX, newJob.getMetadataPrefix());
353
				props.setProperty(OAIPMHRecordProvider.RECORDS,Integer.toString(job.getRecords()));
354
				props.setProperty(OAIPMHRecordProvider.SET,job.getValidationSet());
355
				
356
		
357
				Job jobContent = null;
358
				
359
				ValidatorJobListener listenerContent = listenersManager.createListener();
360
				listenerContent.setJobSubmittedId(jobIdStored);
361
				listenerContent.setJobSubmittedUser(job.getUserEmail());
362
				listenerContent.setGroupBy_xpath(newJob.getGroupByXpath());
363
				listenerContent.setValidationType("content");
364
				listenerContent.setInternalJobsSum(contentJobs);
365
				
366

  
367
				if (job.isCris()) {
368
					logger.debug("Submiting job for cris.");
369
					logger.debug("Total content jobs: " + contentJobs);
370
//					logger.debug("Selected content rules number: " + job.getSelectedContentRules().size());
371
					jobContent = new Job(jobIdStored, 4, rulesContent, props);
372
					validator.submitJobForCris(jobContent, entityChosenRulesMap, entityChosenRulesMapReferential, listenerContent, compTestListener);
373
				} else {
374
					
375
					logger.debug("Selected content rules number: " + job.getSelectedContentRules().size());
376
					for (Integer ruleId : job.getSelectedContentRules()){
377
						
378
						eu.dnetlib.domain.functionality.validator.Rule tempRule=rulesDao.get(ruleId);
379
						
380
						//special behaviour type of rule is chain
381
						if(tempRule.getType().equals("ChainRule")) {
382
							ChainRule<Rule> chainRule = this.handleChain(tempRule);
383
							rulesContent.add(chainRule);
384
						}
385
						else {
386
							rulesContent.add((Rule) this.getRuleClassInstanceByType(tempRule.getType(), tempRule.getConfiguration(), tempRule.getId()));
387
						}
388
					}
389
					
390
					jobContent = new Job(jobIdStored, 1, rulesContent, props);
391
					
392
					if (job.isRegistration()) {
393
						validator.submitJob(jobContent, 1, listenerContent, regListener);
394
					} else {
395
						validator.submitJob(jobContent, 1, listenerContent, compTestListener);
396
					}	
397
				}
398
			}
399
			
400
			//USAGE	
401
			if (job.getSelectedUsageRules() != null) {
402
			
403
				ValidatorJobListener listenerUsage = listenersManager.createListener();
404
				listenerUsage.setJobSubmittedId(jobIdStored);
405
				listenerUsage.setValidationType("usage");
406
				listenerUsage.setJobSubmittedUser(job.getUserEmail());
407
				listenerUsage.setInternalJobsSum(usageJobs);
408
				List <Job> jobsUsage = new ArrayList<Job>();
409
				for (Map.Entry<String, Set<Integer>> entry : verbRuleMap.entrySet()) {
410
					Properties pros = new Properties();
411
					pros.setProperty(OAIPMHSinglePageVerbProvider.VERB,entry.getKey());
412
					pros.setProperty(OAIPMHSinglePageVerbProvider.BASEURL, job.getBaseUrl());
413
					Set<Rule> rulesUsage = new HashSet<Rule>();
414
					for (Integer ruleId : entry.getValue()){
415
						eu.dnetlib.domain.functionality.validator.Rule tempRule = rulesDao.get(ruleId);
416
						logger.debug("prepare to add rule to registry with regexp: "+tempRule.getConfiguration().getProperty("regexp"));
417
						rulesUsage.add((Rule)this.getRuleClassInstanceByType(tempRule.getType(), tempRule.getConfiguration(), tempRule.getId()));
418
					}
419
					jobsUsage.add(new Job(jobIdStored, 2, rulesUsage, pros));
420
				}
421
				for (Job jobUsage : jobsUsage ) {
422
					if (job.isRegistration()) {
423
						validator.submitJob(jobUsage, 1, listenerUsage, regListener);
424
					} else {
425
						validator.submitJob(jobUsage, 1, listenerUsage, compTestListener);
426
					}
427
				}
428
			}			
429
			
430
			
431
		} catch (Exception e) {
432
			logger.error("error submitting job ", e);
433
			throw new ValidatorException(e);
434
		}
435
		
436
	}
437
	
438
	private void prepareCrisJobs(JobForValidation job,
439
			Map<String, Set<Rule>> entityChosenRulesMap,
440
			Map<String, Set<Rule>> entityChosenRulesMapReferential) throws ValidatorException, DaoException {
441
		//getting rules per entity and leave only chosen ones
442
		logger.debug("Selected Entities: " + job.getSelectedCrisEntities());
443
		for (String entity : RuleTypes.getEntities().keySet()) {
444
			
445
			logger.debug("checking entity: " + entity);
446
			Set<Rule> rulesBasic = new HashSet<Rule>();
447
			Set<Rule> rulesReferential = new HashSet<Rule>();
448
			if (job.getSelectedCrisEntities().contains(entity)) {
449
				logger.debug("entity: " + entity + " is selected");
450
				for (eu.dnetlib.domain.functionality.validator.Rule rule : rulesDao.getAllRulesByJobTypeEntityType("content", entity)) {
451
					if (job.getSelectedContentRules().contains(rule.getId())) {
452
						eu.dnetlib.domain.functionality.validator.Rule tempRule=rulesDao.get(rule.getId());
453
						if (rule.getName().contains("Relationship")) {
454
							if (job.isCrisReferentialChecks()) {
455
								rulesReferential.add((Rule) this.getRuleClassInstanceByType(tempRule.getType(), tempRule.getConfiguration(), tempRule.getId()));
456
							} 
457
						} else {
458
							rulesBasic.add((Rule) this.getRuleClassInstanceByType(tempRule.getType(), tempRule.getConfiguration(), tempRule.getId()));
459
						}
460
					}
461
				}
462
				logger.debug("Basic rules: " + rulesBasic.size());
463
				logger.debug("Referential rules: " + rulesReferential.size());
464
				entityChosenRulesMap.put(RuleTypes.getSetOfEntity(entity), rulesBasic);
465
				if (job.isCrisReferentialChecks() && !rulesReferential.isEmpty())
466
					entityChosenRulesMapReferential.put(RuleTypes.getSetOfEntity(entity), rulesReferential);
467
			} else {
468
				logger.debug("entity: " + entity + " is not selected");
469
			}
470
		}
471
		logger.debug("Return entities: " + entityChosenRulesMap.keySet());
472

  
473
		
474
	}
475

  
476

  
477
	public Rule getRuleClassInstanceByType(String type, CustomProperties pros, int id) throws ValidatorException {
478
		logger.debug("getting rule object of type "+type);
479
		String classname = RuleTypes.getClassOfType(type);
480
		if (classname == null){
481
			logger.debug("error getting rule object of type "+type+" classname=null");
482
			return null;
483
		}
484
		else {
485
			try {
486
				Class<?> clazz = Class.forName(classname);
487
				logger.debug("classname: "+clazz.getName());
488
				Properties properties = new Properties();
489
				properties.putAll(pros.getProperties());
490
				return (Rule) clazz.getConstructor(new Class[]{Properties.class,Integer.TYPE}).newInstance(properties,id);
491
				
492
			} catch (Exception e) {
493
				logger.debug("error getting rule object: "+e);
494
				return null;
495
			} 
496
		}
497
	}
498
	
499
	
500
	private ChainRule<Rule> handleChain(eu.dnetlib.domain.functionality.validator.Rule tempRule) throws ValidatorException, DaoException {
501
		logger.debug("chain rule found");
502
		List<Rule> rules_chain = new ArrayList<Rule>();
503
		eu.dnetlib.domain.functionality.validator.Rule tempRuleChain1=rulesDao.get(Integer.parseInt(tempRule.getConfiguration().getProperty("rule_1")));
504
		if(tempRuleChain1.getType().equals("ChainRule")) {
505
			ChainRule<Rule> chainRule1 = this.handleChain(tempRuleChain1);
506
			rules_chain.add(chainRule1);
507
		} else {
508
			rules_chain.add((Rule) this.getRuleClassInstanceByType(tempRuleChain1.getType(), tempRuleChain1.getConfiguration(),tempRuleChain1.getId()));
509
		}
510
		eu.dnetlib.domain.functionality.validator.Rule tempRuleChain2=rulesDao.get(Integer.parseInt(tempRule.getConfiguration().getProperty("rule_2")));
511
		if(tempRuleChain2.getType().equals("ChainRule")) {
512
			ChainRule<Rule> chainRule2 = this.handleChain(tempRuleChain2);
513
			rules_chain.add(chainRule2);
514
		} else {
515
			rules_chain.add((Rule) this.getRuleClassInstanceByType(tempRuleChain2.getType(), tempRuleChain2.getConfiguration(), tempRuleChain2.getId()));
516
		}
517
		Properties chainPros = new Properties();
518
		chainPros.setProperty(ChainRule.TYPE,tempRule.getConfiguration().getProperty(ChainRule.TYPE));
519
		ChainRule<Rule> chainRule = new ChainRule<Rule>(chainPros, tempRule.getId(), rules_chain);
520
		return chainRule;
521
	}
522

  
523
	public Validator getValidator() {
524
		return validator;
525
	}
526

  
527
	public ListenersManager getListenersManager() {
528
		return listenersManager;
529
	}
530

  
531
	public void setListenersManager(ListenersManager listenersManager) {
532
		this.listenersManager = listenersManager;
533
	}
534

  
535
	public void setValidator(Validator validator) {
536
		this.validator = validator;
537
	}
538

  
539
	public JobsDAO getJobsDao() {
540
		return jobsDao;
541
	}
542

  
543
	public void setJobsDao(JobsDAO jobsDao) {
544
		this.jobsDao = jobsDao;
545
	}
546

  
547
	public RulesetsDAO getRulesetsDao() {
548
		return rulesetsDao;
549
	}
550

  
551
	public void setRulesetsDao(RulesetsDAO rulesetsDao) {
552
		this.rulesetsDao = rulesetsDao;
553
	}
554

  
555
	public RulesDAO getRulesDao() {
556
		return rulesDao;
557
	}
558

  
559
	public void setRulesDao(RulesDAO rulesDao) {
560
		this.rulesDao = rulesDao;
561
	}
562

  
563
	public ValidatorRestore getValRestore() {
564
		return valRestore;
565
	}
566

  
567
	public void setValRestore(ValidatorRestore valRestore) {
568
		this.valRestore = valRestore;
569
	}
570
	
571
}
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impl/ValidatorServiceImpl.java
1
package eu.dnetlib.validator.service.impl;
2

  
3
import eu.dnetlib.api.functionality.ValidatorService;
4
import eu.dnetlib.api.functionality.ValidatorServiceException;
5
import eu.dnetlib.domain.ActionType;
6
import eu.dnetlib.domain.ResourceType;
7
import eu.dnetlib.domain.enabling.Notification;
8
import eu.dnetlib.domain.functionality.validator.JobForValidation;
9
import eu.dnetlib.domain.functionality.validator.RuleSet;
10
import eu.dnetlib.domain.functionality.validator.StoredJob;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
12
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
13
import eu.dnetlib.validator.engine.ValidatorException;
14
import gr.uoa.di.driver.app.DriverServiceImpl;
15
import gr.uoa.di.driver.enabling.issn.NotificationListener;
16

  
17
import java.util.List;
18

  
19
import org.apache.log4j.Logger;
20

  
21
public class ValidatorServiceImpl extends DriverServiceImpl implements
22
		ValidatorService {
23

  
24
	private BlackboardNotificationHandler<BlackboardServerHandler> blackboardNotificationHandler = null;
25
	private Logger logger = Logger.getLogger(ValidatorServiceImpl.class);
26

  
27
	private ValidatorManager valManager;
28

  
29
	// do NOT call directly. It will be called by the
30
	// InitializingServiceRegistrationManager
31
	// after it sets the service EPR and id.
32
	@Override
33
	public void init() {
34
		super.init();
35
		if (this.blackboardNotificationHandler != null) { 
36
			this.subscribe(ActionType.UPDATE,
37
				ResourceType.VALIDATORSERVICERESOURCETYPE, this.getServiceEPR()
38
						.getParameter("serviceId"),
39
				"RESOURCE_PROFILE/BODY/BLACKBOARD/LAST_REQUEST",
40
				new NotificationListener() {
41
					@Override
42
					public void processNotification(Notification notification) {
43
						blackboardNotificationHandler.notified(
44
								notification.getSubscriptionId(),
45
								notification.getTopic(),
46
								notification.getIsId(),
47
								notification.getMessage());
48
						logger.debug(notification.getSubscriptionId());
49
						logger.debug(notification.getTopic());
50
						logger.debug(notification.getIsId());
51
						logger.debug(notification.getMessage());
52
					}
53
				});
54
		}
55
	}
56

  
57
	@Override
58
	public StoredJob getStoredJob(int jobId, String groupBy)
59
			throws ValidatorServiceException {
60
		try {
61
			logger.info("received request for job " + jobId);
62
			return valManager.getStoredJob(jobId, groupBy);
63
		} catch (ValidatorException e) {
64
			throw new ValidatorServiceException(e);
65
		}
66

  
67
	}
68

  
69
	@Override
70
	public List<StoredJob> getStoredJobs(String userMail, String jobType,
71
										 Integer offset, Integer limit, String dateFrom, String dateTo)
72
			throws ValidatorServiceException {
73
		try {
74
			logger.debug("received request for jobs of user " + userMail);
75
			return valManager.getStoredJobs(userMail, jobType, offset, limit, dateFrom, dateTo);
76
		} catch (ValidatorException e) {
77
			throw new ValidatorServiceException(e);
78
		}
79
	}
80

  
81
	@Override
82
	public List<StoredJob> getStoredJobsNew(String userMail, String jobType,
83
										 Integer offset, Integer limit, String dateFrom, String dateTo, String jobStatus)
84
			throws ValidatorServiceException {
85
		try {
86
			logger.debug("received request for jobs of user " + userMail);
87
			return valManager.getStoredJobs(userMail, jobType, offset, limit, dateFrom, dateTo, jobStatus);
88
		} catch (ValidatorException e) {
89
			throw new ValidatorServiceException(e);
90
		}
91
	}
92

  
93
	@Override
94
	public int getStoredJobsTotalNumber(String userMail, String jobType)
95
			throws ValidatorServiceException {
96
		try {
97
			logger.debug("received request for total number of jobs of user " + userMail);
98
			return valManager.getStoredJobsTotalNumber(userMail, jobType);
99
		} catch (ValidatorException e) {
100
			throw new ValidatorServiceException(e);
101
		}
102
	}
103

  
104
	@Override
105
	public int getStoredJobsTotalNumberNew(String userMail, String jobType, String jobStatus)
106
			throws ValidatorServiceException {
107
		try {
108
			logger.debug("received request for total number of jobs of user " + userMail);
109
			return valManager.getStoredJobsTotalNumber(userMail, jobType, jobStatus);
110
		} catch (ValidatorException e) {
111
			throw new ValidatorServiceException(e);
112
		}
113
	}
114

  
115
	@Override
116
	public List<RuleSet> getRuleSets() throws ValidatorServiceException {
117
		try {
118
			logger.info("received request for rulesets ");
119
			return valManager.getRuleSets();
120
		} catch (ValidatorException e) {
121
			throw new ValidatorServiceException(e);
122
		}
123
	}
124

  
125
	@Override
126
	public void submitValidationJob(JobForValidation job)
127
			throws ValidatorServiceException {
128
		try {
129
			logger.info("received request to submit job");
130
			this.valManager.submitJob(job);
131
		} catch (ValidatorException e) {
132
			throw new ValidatorServiceException(e);
133
		}
134
	}
135

  
136
	public BlackboardNotificationHandler<BlackboardServerHandler> getBlackboardNotificationHandler() {
137
		return blackboardNotificationHandler;
138
	}
139

  
140
	public ValidatorManager getValManager() {
141
		return valManager;
142
	}
143

  
144
	public void setValManager(ValidatorManager valManager) {
145
		this.valManager = valManager;
146
	}
147

  
148
	public void setBlackboardNotificationHandler(
149
			BlackboardNotificationHandler<BlackboardServerHandler> blackboardNotificationHandler) {
150
		this.blackboardNotificationHandler = blackboardNotificationHandler;
151
	}
152

  
153
}
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impl/NotificationListener.java
1
package eu.dnetlib.validator.service.impl;
2

  
3
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
4
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
5
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
6
import org.apache.log4j.Logger;
7

  
8
public class NotificationListener extends
9
        BlackboardNotificationHandler<BlackboardServerHandler> {
10

  
11
    private static Logger logger = Logger.getLogger(NotificationListener.class);
12
    private ValidatorManager valManager;
13

  
14
    @Override
15
    protected void processJob(BlackboardJob job) {
16
        super.processJob(job);
17

  
18
        logger.debug("New Job!");
19
//		String validationSet = job.getParameters().get("validationSet");
20
//		String validationType = job.getParameters().get("validationType");
21
        String type = job.getParameters().get("type"); //OAI
22
        String mdstoreId = job.getParameters().get("mdstoreId");
23
        String guidelines = job.getParameters().get("guidelines");
24
        String groupBy = job.getParameters().get("groupBy");
25
        String records = job.getParameters().get("records");
26
        String outputEpr = job.getParameters().get("outputEpr");
27
        if (outputEpr == null) {
28
            outputEpr = "false";
29
        }
30

  
31
        String blacklistGuidelines = job.getParameters().get("blacklistGuidelines");
32
        if (blacklistGuidelines == null)
33
            blacklistGuidelines = guidelines;
34

  
35
        String blacklistedRecords = job.getParameters().get("blacklistedRecords");
36
        if (blacklistedRecords == null) {
37
            blacklistedRecords = "false";
38
        }
39

  
40
        String submissionDate = job.getParameters().get("submissionDate");
41

  
42
        String jobStatusUpdateInterval = job.getParameters().get("jobStatusUpdateInterval");
43

  
44
        //the datasource id
45
        String datasourceId = job.getParameters().get("datasourceId");
46

  
47
        //the datasource baseUrl, optional for type=OAI, mandatory for type=DNET
48
        String baseUrl = job.getParameters().get("baseUrl");
49

  
50
        //the namespace prefix of the datasource, optional
51
        String datasourceNamespacePrefix = job.getParameters().get("datasourceNamespacePrefix");
52

  
53
        //the datasource name, optional
54
        String datasourceName = job.getParameters().get("datasourceName");
55

  
56
        //json map of other parameters that may be of interest, optional
57
        String extra_param = job.getParameters().get("extra_param");
58

  
59
        try {
60
            int workers = 1;
61
            int recordsInt = 0;
62
            int jobStatusUpdateIntervalInt = 100;
63
            if (records == null || records.equalsIgnoreCase("all"))
64
                recordsInt = -1;
65
            else
66
                recordsInt = Integer.parseInt(records);
67
            if (recordsInt > -1 && recordsInt < 100)
68
                workers = 1;
69
            else
70
                workers = 4;
71

  
72
            if (job.getParameters().get("workers") != null)
73
                workers = Integer.parseInt(job.getParameters().get("workers"));
74

  
75
            if (jobStatusUpdateInterval != null)
76
                jobStatusUpdateIntervalInt = Integer.parseInt(jobStatusUpdateInterval);
77

  
78
            if (recordsInt != 0) {
79
                logger.debug("New Job of type: " + type + " - workers: " + workers + " - recordsInt: " + recordsInt);
80
                logger.debug("workers: " + workers);
81
                logger.debug("recordsInt: " + recordsInt);
82
                logger.debug("jobStatusUpdateInterval: " + jobStatusUpdateIntervalInt);
83
                if (job.getAction().equalsIgnoreCase("VALIDATE")) {
84
                    if (type.equalsIgnoreCase("OAI")) {
85
                        logger.debug("type is oai");
86

  
87
                    } else if (type.equalsIgnoreCase("DNET")) {
88
                        logger.debug("type is dnet");
89
                        logger.debug("groupBy: " + groupBy);
90
//						this.getBlackboardHandler().ongoing(job);
91
                        valManager.beginDataJobForWorkflow(mdstoreId, guidelines, groupBy, recordsInt, workers, job, this, jobStatusUpdateIntervalInt, Boolean.parseBoolean(outputEpr), Boolean.parseBoolean(blacklistedRecords), blacklistGuidelines);
92
                    }
93
                } else {
94
                    throw new Exception("Unknown action");
95
                }
96
            } else {
97
                throw new Exception("There are no records in the MDStore.");
98
            }
99
        } catch (Exception e) {
100
            logger.error("Failed to begin the workflow for the job received", e);
101
            getBlackboardHandler().failed(job, e);
102
        }
103
    }
104

  
105
    public ValidatorManager getValManager() {
106
        return valManager;
107
    }
108

  
109
    public void setValManager(ValidatorManager valManager) {
110
        this.valManager = valManager;
111
    }
112
}
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/persistance/ThreadSafeRegistry.java
1
package eu.dnetlib.validator.service.impls.persistance;
2

  
3
public interface ThreadSafeRegistry {
4

  
5
	public void lock();
6
	public void unlock();
7
	public void removeObject(int id);
8
}
0 9

  
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/persistance/MemoryRegistry.java
1
package eu.dnetlib.validator.service.impls.persistance;
2

  
3
import java.io.Serializable;
4
import java.util.HashMap;
5
import java.util.Map;
6

  
7
import eu.dnetlib.validator.engine.persistance.Registry;
8

  
9
/**
10
 * A Registry that stores the objects in an in-memory {@link Map}
11
 * @author Manos Karvounis
12
 * @param <T>
13
 */
14
public class MemoryRegistry<T extends Serializable> extends Registry<T>  {
15

  
16
	protected Map<Integer, T> registry = null;
17

  
18
	public MemoryRegistry(String name) {
19
		super(name);
20
		this.registry = new HashMap<Integer, T>();
21
	}
22

  
23
	@Override
24
	public T getObject(int id) {
25
		return registry.get(id);
26
	}
27

  
28
	@Override
29
	public void addObject(int id, T obj) {
30
		registry.put(id, obj);
31
	}
32

  
33
	public Map<Integer, T> getAllObjects() {
34
		return this.registry;
35
	}
36
}
0 37

  
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/persistance/ThreadSafeMemoryRegistry.java
1
package eu.dnetlib.validator.service.impls.persistance;
2

  
3
import java.io.Serializable;
4
import java.util.concurrent.locks.Lock;
5
import java.util.concurrent.locks.ReentrantLock;
6

  
7
/**
8
 * A {@link MemoryRegistry} that may be accesses by multiple threads silmutaneously.
9
 * @author Manos Karvounis
10
 * @param <T>
11
 */
12
public class ThreadSafeMemoryRegistry<T extends Serializable> extends MemoryRegistry<T> implements ThreadSafeRegistry {
13
	
14
	private final Lock lock = new ReentrantLock();
15
	
16
	public ThreadSafeMemoryRegistry(String name) {
17
		super(name);
18
	}
19

  
20
	@Override
21
	public void lock() {
22
		lock.lock();
23
	}
24

  
25
	@Override
26
	public void unlock() {
27
		lock.unlock();
28
	}
29

  
30
	@Override
31
	public void removeObject(int id) {
32
		registry.remove(id);
33
	}
34

  
35
}
0 36

  
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/persistance/BlacklistServlet.java
1
package eu.dnetlib.validator.service.impls.persistance;
2

  
3
import java.io.BufferedInputStream;
4
import java.io.File;
5
import java.io.FileInputStream;
6
import java.io.IOException;
7
import java.io.InputStream;
8
import java.io.PrintWriter;
9

  
10
import javax.servlet.ServletException;
11
import javax.servlet.http.HttpServlet;
12
import javax.servlet.http.HttpServletRequest;
13
import javax.servlet.http.HttpServletResponse;
14

  
15
import org.apache.commons.io.IOUtils;
16
import org.apache.log4j.Logger;
17
import org.springframework.context.ApplicationContext;
18
import org.springframework.web.context.support.WebApplicationContextUtils;
19

  
20
public class BlacklistServlet extends HttpServlet {
21

  
22
	/**
23
	 * 
24
	 */
25
	private static final long serialVersionUID = 1788389574262137242L;
26
	private static Logger logger = Logger.getLogger(BlacklistServlet.class);
27
	
28
	private static final String REQUEST_GETBLACKLIST = "GetBlacklistedRecords";
29
//	private TasksDAO tasksDao;
30
	private String path;
31
	
32
	@Override
33
	public void init() throws ServletException {
34
		ApplicationContext context = WebApplicationContextUtils
35
                .getWebApplicationContext(getServletContext());
36
		
37
//		this.tasksDao = (TasksDAO) context.getBean("tasksDao");
38
//		this.path = "/tmp/validator-wf/";
39
		this.path = "/var/lib/dnet/validator/workflow_blacklists/";
40
//		this.path = context.getEnvironment().getProperty("blacklistedRecordsPath");
41
	}
42
	
43
	@Override
44
	public void doGet(HttpServletRequest req, HttpServletResponse resp)
45
			throws ServletException, IOException {
46
		this.handleRequest(req, resp);
47
	}
48

  
49
	@Override
50
	public void doPost(HttpServletRequest req, HttpServletResponse resp)
51
			throws ServletException, IOException {
52
		this.handleRequest(req, resp);
53
	}
54

  
55
	public void handleRequest(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
56

  
57
		InputStream is = null;
58
		
59
		
60
		String dataSourceId = req.getParameter("datasourceId");
61
		String requestType = req.getParameter("request");
62
		try {
63
			if (requestType.equals(REQUEST_GETBLACKLIST)) {
64
				logger.debug("parameter: " + dataSourceId);
65
				File file = new File(path + dataSourceId);
66
				if (file.isFile()) {
67
					logger.debug("file exists");
68
					is = new BufferedInputStream(new FileInputStream(file));
69
					resp.setStatus(200);
70
					resp.setContentType("text/plain");
71
					resp.setContentLength((int) file.length());
72
					
73
//					InputStream fileStream = new FileInputStream(file);
74
//					InputStream gzipStream = new GZIPInputStream(fileStream);
75
//					Reader decoder = new InputStreamReader(gzipStream, "UTF-8");
76
//					BufferedReader buffered = new BufferedReader(decoder);
77
//					IOUtils.copyLarge(buffered, resp.getWriter());
78
					
79
					IOUtils.copyLarge(is, resp.getOutputStream());
80
//					tasksDao.get(4335);
81
				} else {
82
					PrintWriter out = resp.getWriter();
83
					resp.setStatus(400);
84
					resp.setContentType("text/plain");
85
					out.write("No file for this id: " + dataSourceId);
86
					out.close();
87
				}
88
			} else {
89
				PrintWriter out = resp.getWriter();
90
				resp.setStatus(500);
91
				resp.setContentType("text/plain");
92
				out.write("Request: " + requestType + " is not supported");
93
				out.close();
94
			}
95
		} catch (Exception e) {
96
			resp.setStatus(500);
97
			e.printStackTrace(resp.getWriter());
98
		} finally {
99
			if (is != null)
100
				is.close();
101
		}
102
		
103
	}
104
}
0 105

  
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/valobjs/TextValidationObject.java
1
package eu.dnetlib.validator.service.impls.valobjs;
2

  
3
import eu.dnetlib.validator.engine.execution.ValidationObject;
4

  
5
/**
6
 * Indicates that a Validatiob Object is actually plain text.
7
 * @author Manos Karvounis
8
 */
9
public interface TextValidationObject extends ValidationObject {
10

  
11
	public String getContentAsText();
12
	
13
}
0 14

  
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/valobjs/XMLTextValidationObject.java
1
package eu.dnetlib.validator.service.impls.valobjs;
2

  
3
import java.io.StringWriter;
4

  
5
import javax.xml.transform.Result;
6
import javax.xml.transform.Source;
7
import javax.xml.transform.Transformer;
8
import javax.xml.transform.TransformerConfigurationException;
9
import javax.xml.transform.TransformerException;
10
import javax.xml.transform.TransformerFactory;
11
import javax.xml.transform.TransformerFactoryConfigurationError;
12
import javax.xml.transform.dom.DOMSource;
13
import javax.xml.transform.stream.StreamResult;
14
import javax.xml.xpath.XPath;
15
import javax.xml.xpath.XPathConstants;
16
import javax.xml.xpath.XPathExpression;
17
import javax.xml.xpath.XPathExpressionException;
18
import javax.xml.xpath.XPathFactory;
19

  
20
import org.apache.log4j.Logger;
21
import org.w3c.dom.Document;
22
import org.w3c.dom.NodeList;
23

  
24
import eu.dnetlib.validator.engine.data.DataException;
25
/**
26
 * Indicates that a Validation Object is actually an xml text.
27
 * @author Manos Karvounis
28
 */
29
public class XMLTextValidationObject implements TextValidationObject {
30

  
31
	private transient Logger log = Logger.getLogger(XMLTextValidationObject.class);
32
	
33
	private String id = "";
34
	private String status;
35
	private final Document doc;
36
	private XPathFactory factory = XPathFactory.newInstance();
37

  
38
	public XMLTextValidationObject(Document doc) {
39
		super();
40
		this.doc = doc;
41
	}
42

  
43
	@Override
44
	public String getId() {
45
		return this.id;
46
	}
47
	
48
	@Override
49
	public void setId(String id) {
50
		this.id = id;
51
	}
52

  
53
	public String getStatus() {
54
		return status;
55
	}
56

  
57
	public void setStatus(String status) {
58
		this.status = status;
59
	}
60

  
61
	@Override
62
	public Object getContentAsObject() {
63
		return this.doc;
64
	}
65

  
66
	@Override
67
	public String getContentAsText() {
68
		StringWriter sw = new StringWriter();
69
	    Transformer transformer = null;
70
		try {
71
			transformer = TransformerFactory.newInstance().newTransformer();
72
		    Source source = new DOMSource(doc);
73
		    Result output = new StreamResult(sw);
74
			transformer.transform(source, output);
75
		} catch (TransformerConfigurationException e) {
76
			log.error("", e);
77
		} catch (TransformerFactoryConfigurationError e) {
78
			log.error("", e);
79
		} catch (TransformerException e) {
80
			log.error("", e);
81
		}
82
	    return sw.toString();
83
	}
84
	
85
	public Document getDocument() {
86
		return this.doc;
87
	}
88

  
89
	/**
90
	 * Get the nodes retrieved by issuing the given XPATH query.
91
	 * @param xpath
92
	 * @return
93
	 * @throws DataException
94
	 */
95
	public NodeList getNodes(String xpath) throws DataException {
96
		log.debug("xpath: "+xpath);
97
		try {
98
			XPath xxpath = factory.newXPath();
99
			XPathExpression expr = xxpath.compile(xpath);
100
			return (NodeList) expr.evaluate(doc, XPathConstants.NODESET);
101
		} catch (XPathExpressionException e) {
102
			throw new DataException();
103
		}
104
	}
105

  
106
}
0 107

  
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/executors/JobWorker.java
1
package eu.dnetlib.validator.service.impls.executors;
2

  
3
import java.util.ArrayList;
4
import java.util.Calendar;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.Set;
9

  
10
import org.apache.log4j.Logger;
11
import org.springframework.core.task.SyncTaskExecutor;
12

  
13
import eu.dnetlib.validator.engine.ValidatorException;
14
import eu.dnetlib.validator.engine.data.Provider;
15
import eu.dnetlib.validator.engine.data.Provider.ProviderException;
16
import eu.dnetlib.validator.engine.data.ResultSet;
17
import eu.dnetlib.validator.engine.data.Rule;
18
import eu.dnetlib.validator.engine.execution.Executor;
19
import eu.dnetlib.validator.engine.execution.Job;
20
import eu.dnetlib.validator.engine.execution.JobListener;
21
import eu.dnetlib.validator.engine.execution.Task;
22
import eu.dnetlib.validator.engine.execution.TaskList;
23
import eu.dnetlib.validator.engine.execution.ValidationObject;
24
import eu.dnetlib.validator.service.impls.providers.DnetProvider;
25
import eu.dnetlib.validator.service.impls.providers.OAIPMHRecordProvider;
26

  
27
/**
28
 * Retrieves the Validation Objects of a {@link Job} using the contained
29
 * Provider. A new Task in created for each Validation Object and all the rules
30
 * contained in the Job. Each task is submitted for execution at an
31
 * {@link Executor}
32
 * 
33
 * @author Nikon Gasparis
34
 *
35
 */
36
public class JobWorker implements Runnable {
37

  
38
	private transient Logger log = Logger.getLogger(JobWorker.class);
39

  
40
	private final SyncTaskExecutor taskExecutor;
41
	public final long generalTimeout;
42
	public final String set;
43
	public final int jobId;
44
	public final Set<Rule> rules;
45
	public final Provider provider;
46
	public final JobListener validatorListener;
47
	public final JobListener[] listeners;
48

  
49
	protected Map<String, Object> jobContext;
50

  
51
	private Map<String, Object> recordContext;
52
	private int deletedRecords = 0;
53

  
54
	public JobWorker(int jobId, Set<Rule> rules, String set, Provider provider,
55
			JobListener validatorListener, SyncTaskExecutor taskExecutor,
56
			long generalTimeout, JobListener... listeners) {
57
		super();
58
		log.debug("Creating a new job Worker with generalTimeout " + generalTimeout);
59
		this.taskExecutor = taskExecutor;
60
		this.generalTimeout = generalTimeout;
61
		this.set = set;
62
		this.jobId = jobId;
63
		this.rules = rules;
64
		this.provider = provider;
65
		this.jobContext = new HashMap<String, Object>();
66
		this.listeners = listeners;
67
		this.validatorListener = validatorListener;
68

  
69
	}
70

  
71
	public void submit() throws JobWorkerException {
72
		long elapsed = 0;
73
		int count = 0;
74
		int record_limit = -1;
75
		log.debug("Submitting job");
76
		if (provider.getConfiguration().getProperty(
77
				OAIPMHRecordProvider.RECORDS) != null) {
78
			if (!provider.getConfiguration()
79
					.getProperty(OAIPMHRecordProvider.RECORDS).trim()
80
					.equals("-1")) {
81
				record_limit = Integer.parseInt(provider.getConfiguration()
82
						.getProperty(OAIPMHRecordProvider.RECORDS).trim());
83
			}
84
		}
85
		if (provider.getConfiguration().getProperty(DnetProvider.RECORDS) != null) {
86
			if (!provider.getConfiguration().getProperty(DnetProvider.RECORDS)
87
					.trim().equals("-1")) {
88
				record_limit = Integer.parseInt(provider.getConfiguration()
89
						.getProperty(DnetProvider.RECORDS).trim());
90
			}
91
		}
92
		
93
//		int batchSize = -1;
94
//		if (provider.getConfiguration().getProperty(DnetProvider.BATCH_SIZE) != null) {
95
//			 batchSize = Integer.parseInt(provider.getConfiguration()
96
//						.getProperty(DnetProvider.BATCH_SIZE));
97
//		}
98
		log.debug("Number of records to validate: " + record_limit
99
				+ " (-1 for all)");
100

  
101
		Boolean crisFlag = false;
102
		if (set != null && set.contains("_cris_"))
103
			crisFlag = true;
104

  
105
		boolean success = false;
106
		List<Task> tasks = new ArrayList<Task>();
107
		ResultSet<ValidationObject> ivobjs = null;
108
		try {
109
			if (!crisFlag) {
110
				log.debug("request to get validation objects without set..");
111
				ivobjs = provider.getValidationObjects();
112
			} else {
113
				log.debug("request to get validation objects with set: " + set);
114
				ivobjs = provider.getValidationObjects(set);
115
			}
116
		} catch (ProviderException e1) {
117
			log.error("Error while getting validation objects from provider",
118
					e1);
119
			for (JobListener listener : listeners) {
120
				listener.failed(jobId, this.jobContext, e1);
121
			}
122
		}
123
		try {
124
			long recordsTime = 0;
125
			while ((count < record_limit || record_limit == -1)
126
					&& ivobjs.next()) {
127
				this.recordContext = new HashMap<String, Object>();
128
				tasks.clear();
129
				ValidationObject vobj = ivobjs.get();
130

  
131
				log.debug("Checking if rules will be applied on object "
132
						+ vobj.getId());
133
				if (vobj.getStatus() != null
134
						&& vobj.getStatus().equalsIgnoreCase("deleted")) {
135
					log.debug("Object is deleted and will be ignored");
136
					deletedRecords++;
137
					continue;
138
				}
139

  
140
				
141
				if (crisFlag
142
						&& (vobj.getId().contains("cfEAddr") || vobj.getId()
143
								.contains("cfEquip"))) {
144
					log.debug("ignoring cfEaddr and cfEquip records from persons and datasets");
145
					continue;
146
				}
147

  
148
				log.debug("Applying rules on object " + vobj.getId());
149
				count++;
150
				for (Rule rule : rules) {
151
					if (rule != null) {
152
						rule.setValObjId(vobj.getId());
153
						rule.setProvider(provider);
154
						Task task = new Task(vobj, rule);
155
						tasks.add(task);
156
					}
157
				}
158

  
159
				TaskList ltasks = new TaskList(tasks);
160
				try {
161
					long time1 = Calendar.getInstance().getTimeInMillis();
162
//					Object lock = new Object();
163
					taskExecutor.execute(ltasks);
164
					long time2 = Calendar.getInstance().getTimeInMillis();
165
					log.debug("Task execution took " + ((time2 - time1))
166
							+ " milli seconds");
167
					elapsed += time2 - time1;
168
					log.debug("Elapsed time till now is " + elapsed/1000
169
							+ " seconds");
170
					recordsTime += time2 - time1;
171
//				    if (count % batchSize == 0) {	
172
//						log.debug("Elapsed time for " + batchSize + " records is  " + recordsTime
173
//								+ " milli seconds");
174
//						recordsTime = 0;
175
//					}
176
//					if (elapsed > this.generalTimeout)
177
//						throw new JobWorkerException("Job timed out");
178
					long time3 = Calendar.getInstance().getTimeInMillis();
179
					for (JobListener listener : listeners) {
180
						listener.currentResults(ltasks.getCtasks(), jobId,
181
								vobj.getContentAsObject(), this.recordContext);
182
					}
183
					long time4 = Calendar.getInstance().getTimeInMillis();
184
					log.debug("Informing listeners took " + ((time4 - time3))
185
							+ " milli seconds");
186
					success = true;
187
				} catch (Exception e) {
188
					log.error("an error occured while executing tasks: ", e);
189
					for (JobListener listener : listeners) {
190
						listener.currentResults(ltasks.getCtasks(), jobId,
191
								vobj.getContentAsObject(), this.recordContext,
192
								e);
193
					}
194
				}
195

  
196
			}
197
		} catch (Exception e) {
198
			log.error("data error", e);
199
			for (JobListener listener : listeners) {
200
				listener.failed(jobId, this.jobContext, e);
201
			}
202
		}
203
		if (success) {
204
			jobContext.put("deletedRecords", deletedRecords);
205
			for (JobListener listener : listeners) {
206
				listener.finished(jobId, this.jobContext);
207
			}
208
		} else {
209
			log.error("an error occured");
210
			for (JobListener listener : listeners) {
211
				if (ivobjs.getError() == null)
212
					listener.failed(jobId, this.jobContext,
213
							new JobWorkerException("All tasks failed"));
214
				else {
215
					listener.failed(jobId, this.jobContext,
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff