Project

General

Profile

1
package eu.dnetlib.validator.service.impls;
2

    
3
import java.io.Serializable;
4
import java.util.ArrayList;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Properties;
8
import java.util.Set;
9
import java.util.UUID;
10

    
11
import net.sf.ehcache.Cache;
12
import net.sf.ehcache.CacheManager;
13

    
14
import org.apache.log4j.Logger;
15
import org.quartz.spi.ThreadPool;
16
import org.springframework.core.task.SyncTaskExecutor;
17
import org.springframework.core.task.TaskExecutor;
18

    
19
import eu.dnetlib.validator.engine.Validator;
20
import eu.dnetlib.validator.engine.ValidatorException;
21
import eu.dnetlib.validator.engine.data.Provider;
22
import eu.dnetlib.validator.engine.data.Rule;
23
import eu.dnetlib.validator.engine.execution.CompletedTask;
24
import eu.dnetlib.validator.engine.execution.Job;
25
import eu.dnetlib.validator.engine.execution.JobListener;
26
import eu.dnetlib.validator.service.impls.executors.JobWorker;
27
import eu.dnetlib.validator.service.impls.listeners.CrisListener;
28
import eu.dnetlib.validator.service.impls.listeners.ListenersManager;
29
import eu.dnetlib.validator.service.impls.providers.CrisProvider;
30
import eu.dnetlib.validator.service.impls.providers.DnetProviderNew;
31
import eu.dnetlib.validator.service.impls.providers.OAIPMHRecordProvider;
32
import eu.dnetlib.validator.service.impls.providers.ProvidersManager;
33

    
34
/**
35
 * A simple validator that stores rules, providers, and executing/pending jobs
36
 * in memory. It uses an SyncTaskExecutor to submit and execute jobs.
37
 * 
38
 * @author Nikon Gasparis
39
 * @see ThreadPool
40
 */
41
public class SpringValidator implements Validator {
42

    
43
	private CacheManager cacheManager;
44

    
45
	private ProvidersManager providersManager;
46

    
47
	private ListenersManager listenersManager;
48

    
49
	private transient Logger log = Logger.getLogger(SpringValidator.class);
50

    
51
	private final TaskExecutor jobExecutor;
52

    
53
	private final SyncTaskExecutor taskExecutor;
54
	
55
	private final long generalTimeout;
56

    
57
	private Boolean dnetWorkflow;
58

    
59
	/**
60
	 * Creates a new validator.
61
	 * 
62
	 * @param jobExecutor
63
	 *            The module used to run the providers and submit jobs.
64
	 * @param taskExecutor
65
	 *            The executor used to execute the jobs.
66
	 * @param generalTimeout
67
	 *            How long to wait for a job to be executed (in seconds). A job
68
	 *            is composed of the application of all the rules on all of the
69
	 *            Validation Objects.
70
	 */
71
	
72
	public SpringValidator(TaskExecutor jobExecutor, SyncTaskExecutor taskExecutor, long generalTimeout) {
73
		super();
74
		log.info("Creating a new Validator");
75
		this.generalTimeout = generalTimeout;
76
		this.taskExecutor = taskExecutor;
77
		this.jobExecutor = jobExecutor;
78
	}
79

    
80
	@Override
81
	public <T extends Serializable> void addToRegistry(int objid, T obj,
82
			String registryName) {
83
		throw new UnsupportedOperationException(
84
				"You may not add new registries to this Validator implementation");
85
	}
86

    
87
	@Override
88
	public Serializable getFromRegistry(int objid, String registryName)
89
			throws ValidatorException {
90
		throw new UnsupportedOperationException(
91
				"You may not add new registries to this Validator implementation");
92
	}
93

    
94
	@Override
95
	public <T extends Serializable> void addRegistry(String name) {
96
		throw new UnsupportedOperationException(
97
				"You may not add new registries to this Validator implementation");
98
	}
99

    
100
	@Override
101
	public void submitJob(Job job, int workers, JobListener... listeners)
102
			throws ValidatorException {
103
		log.debug("Submitting job " + job.id);
104
		List<Provider> providers = new ArrayList<Provider>();
105

    
106
		try {
107
			log.debug("Creating a new provider instance");
108
			for (int i = 0; i < workers; i++) {
109
				Provider prv = null;
110
				if (job.providerId == 1)
111
					prv = providersManager.createOaipmhRecordProvider();
112
				else if (job.providerId == 2)
113
					prv = providersManager.createOaipmhSinglePageVerbProvider();
114
				else if (job.providerId == 3)
115
					prv = providersManager.createDnetProvider();
116
				providers.add(prv);
117
			}
118
		} catch (Exception e) {
119
			log.error("Error creating provider instance", e);
120
		}
121
		ValidatorJobMainListener mainListener = new ValidatorJobMainListener();
122
		// mainListener.setWorkers(workers);
123
		Integer workerId = 0;
124
		for (Provider prov : providers) {
125
			Properties props = job.providerProps;
126
			if (job.providerProps.getProperty(DnetProviderNew.MDSTORE_ID) != null) {
127
				props = new Properties();
128
				props.setProperty(DnetProviderNew.WORKER_ID,
129
						Integer.toString(workerId));
130
				props.setProperty(DnetProviderNew.WORKERS,
131
						Integer.toString(workers));
132
				props.setProperty(DnetProviderNew.MDSTORE_ID,
133
						job.providerProps.getProperty(DnetProviderNew.MDSTORE_ID));
134
				props.setProperty(DnetProviderNew.BATCH_SIZE,
135
						job.providerProps.getProperty(DnetProviderNew.BATCH_SIZE));
136
				props.setProperty(DnetProviderNew.RECORDS,
137
						job.providerProps.getProperty(DnetProviderNew.RECORDS));
138
			}
139
			workerId++;
140
			prov.setConfiguration(props);
141
			String set = props.getProperty(OAIPMHRecordProvider.SET);
142
			if (set == null)
143
				set = "none";
144
			JobWorker jobWorker = new JobWorker(job.id, job.rules, set, prov,
145
					mainListener, taskExecutor, generalTimeout, listeners);
146
			jobExecutor.execute(jobWorker);
147
		}
148
	}
149

    
150
	@Override
151
	public void submitJobForCris(Job job,
152
			Map<String, Set<Rule>> rulesPerEntity,
153
			Map<String, Set<Rule>> rulesPerEntityRef, JobListener... listeners)
154
			throws ValidatorException {
155

    
156
		CrisProvider prv = providersManager.createCrisProvider();
157

    
158
		log.debug("creating cris  - timeout: " + prv.getTimeout());
159
		String cacheName = UUID.randomUUID().toString();
160

    
161
		Properties props = job.providerProps;
162

    
163
		if (!rulesPerEntityRef.isEmpty()) {
164
			cacheManager.addCache(new Cache(cacheName, 20000, true, true,
165
					10000, 10000));
166
			log.debug("caches: ");
167
			for (String cache : cacheManager.getCacheNames()) {
168
				log.debug("name: " + cache);
169
			}
170
			prv.setCache(cacheManager.getCache(cacheName));
171
		}
172

    
173
		prv.setEntities(rulesPerEntity.keySet());
174
		prv.setBaseUrl(props.getProperty(OAIPMHRecordProvider.BASEURL));
175
		prv.setMetadataPrefix(props
176
				.getProperty(OAIPMHRecordProvider.METADATA_PREFIX));
177
		prv.setRecords(props.getProperty(OAIPMHRecordProvider.RECORDS));
178

    
179
		ValidatorJobMainListener mainListener = new ValidatorJobMainListener();
180
		// mainListener.setWorkers(rulesPerEntity.keySet().size() +
181
		// rulesPerEntityRef.keySet().size());
182

    
183
		CrisListener crisListener = listenersManager.createCrisListener();
184
		crisListener.setWorkersFirstPhase(rulesPerEntity.keySet().size());
185

    
186
		
187
		//TODO UPDATE LISTENER
188

    
189
		crisListener.setExecutor(jobExecutor);
190
		//		crisListener.setExecutor(taskExecutor);
191
		crisListener.setProvider(prv);
192
		crisListener.setWorkers(rulesPerEntity.keySet().size()
193
				+ rulesPerEntityRef.keySet().size());
194
		if (!rulesPerEntityRef.isEmpty()) {
195
			crisListener.setCacheManager(cacheManager);
196
			crisListener.setCacheName(cacheName);
197
		}
198

    
199
		mainListener.setCrisListener(crisListener);
200
		for (Map.Entry<String, Set<Rule>> entry : rulesPerEntity.entrySet()) {
201
			log.debug("Cris# set: " + entry.getKey() + " - Rules size: "
202
					+ entry.getValue().size());
203
			prv.setSet(entry.getKey());
204
			props.setProperty(OAIPMHRecordProvider.SET, entry.getKey());
205
			prv.setConfiguration(props);
206
			JobWorker jobWorker = new JobWorker(job.id, entry.getValue(), entry.getKey(), prv,
207
					mainListener, taskExecutor, generalTimeout,
208
					listeners);
209
			jobExecutor.execute(jobWorker);
210
		}
211

    
212
		log.error("Creating Sumbitters for referential check..");
213
		List<JobWorker> submittersForReferential = new ArrayList<JobWorker>();
214
		for (Map.Entry<String, Set<Rule>> entry : rulesPerEntityRef.entrySet()) {
215
			log.error("Ref Submitter set: " + entry.getKey() + " and rules: "
216
					+ entry.getValue() + " and new rule ids: "
217
					+ entry.getValue().size());
218
			// logger.debug("Cris# set: "+ entry.getKey() + " - Rules: " +
219
			// entry.getValue());
220
			prv.setSet(entry.getKey());
221
			props.setProperty(OAIPMHRecordProvider.SET, entry.getKey());
222
			prv.setConfiguration(props);
223
			submittersForReferential.add(new JobWorker(job.id,
224
					entry.getValue(), entry.getKey(), prv, mainListener,
225
					taskExecutor, generalTimeout, listeners));
226
		}
227
		//TODO UPDATE LISTENER
228
//		crisListener.setSubmittersForReferential(submittersForReferential);
229
		crisListener.setSubmittersForReferential(submittersForReferential);
230
		log.error("Sumbitters created for referential check: "
231
				+ submittersForReferential.size());
232

    
233
	}
234

    
235
	@Override
236
	public void start() throws ValidatorException {
237
		log.debug("Starting validator");
238
	}
239

    
240
	
241
	private class ValidatorJobMainListener implements JobListener {
242

    
243
		private CrisListener crisListener = null;
244

    
245
		public void setCrisListener(CrisListener crisListener) {
246
			this.crisListener = crisListener;
247
		}
248

    
249
		@Override
250
		public void currentResults(List<CompletedTask> tasks, int jobId,
251
				Object object, Map<String, Object> recordContext, Throwable t) {
252
			// TODO Auto-generated method stub
253

    
254
		}
255

    
256
		@Override
257
		public void currentResults(List<CompletedTask> tasks, int jobId,
258
				Object object, Map<String, Object> recordContext) {
259
			// TODO Auto-generated method stub
260

    
261
		}
262

    
263
		@Override
264
		public synchronized void finished(int jobId,
265
				Map<String, Object> jobContext) {
266
			if (crisListener != null)
267
				crisListener.finished(jobId, jobContext);
268
			log.debug("Job " + jobId
269
					+ " has finished, removing it from the registry");
270
		}
271

    
272
		@Override
273
		public synchronized void failed(int jobId,
274
				Map<String, Object> jobContext, Throwable t) {
275
			if (crisListener != null)
276
				crisListener.failed(jobId, jobContext, t);
277
			log.debug("A job has finished-failed, removing it from the registry");
278
		}
279

    
280
	}
281

    
282
	public Boolean getDnetWorkflow() {
283
		return dnetWorkflow;
284
	}
285

    
286
	public void setDnetWorkflow(Boolean dnetWorkflow) {
287
		this.dnetWorkflow = dnetWorkflow;
288
	}
289

    
290
	public ProvidersManager getProvidersManager() {
291
		return providersManager;
292
	}
293

    
294
	public void setProvidersManager(ProvidersManager providersManager) {
295
		this.providersManager = providersManager;
296
	}
297

    
298
	public CacheManager getCacheManager() {
299
		return cacheManager;
300
	}
301

    
302
	public void setCacheManager(CacheManager cacheManager) {
303
		this.cacheManager = cacheManager;
304
	}
305

    
306
	public ListenersManager getListenersManager() {
307
		return listenersManager;
308
	}
309

    
310
	public void setListenersManager(ListenersManager listenersManager) {
311
		this.listenersManager = listenersManager;
312
	}
313

    
314
	@Override
315
	public void shutdown() throws ValidatorException {
316
		// TODO Auto-generated method stub
317
		
318
	}
319

    
320
}
(2-2/3)