Project

General

Profile

1
package eu.dnetlib.validator.service.impls;
2

    
3
import java.io.Serializable;
4
import java.util.ArrayList;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Properties;
8
import java.util.Set;
9
import java.util.UUID;
10
import java.util.concurrent.ExecutorService;
11

    
12
import net.sf.ehcache.Cache;
13
import net.sf.ehcache.CacheManager;
14

    
15
import org.apache.log4j.Logger;
16

    
17
import eu.dnetlib.validator.engine.Validator;
18
import eu.dnetlib.validator.engine.ValidatorException;
19
import eu.dnetlib.validator.engine.data.Provider;
20
import eu.dnetlib.validator.engine.data.Rule;
21
import eu.dnetlib.validator.engine.execution.CompletedTask;
22
import eu.dnetlib.validator.engine.execution.Executor;
23
import eu.dnetlib.validator.engine.execution.Job;
24
import eu.dnetlib.validator.engine.execution.JobListener;
25
import eu.dnetlib.validator.service.impls.executors.ExecutorSubmitter;
26
import eu.dnetlib.validator.service.impls.executors.ThreadExecutorSubmitter;
27
import eu.dnetlib.validator.service.impls.listeners.CrisListener;
28
import eu.dnetlib.validator.service.impls.listeners.ListenersManager;
29
import eu.dnetlib.validator.service.impls.persistance.MemoryRegistry;
30
import eu.dnetlib.validator.service.impls.providers.CrisProvider;
31
import eu.dnetlib.validator.service.impls.providers.DnetProvider;
32
import eu.dnetlib.validator.service.impls.providers.OAIPMHRecordProvider;
33
import eu.dnetlib.validator.service.impls.providers.ProvidersManager;
34

    
35

    
36
/**
37
 * A simple validator that stores rules, providers, and executing/pending jobs
38
 * in memory. It uses an ExecutorService to submit and execute jobs.
39
 * 
40
 * @author Manos Karvounis
41
 * @author Nikon Gasparis
42
 * @see ExecutorService
43
 */
44

    
45
@Deprecated
46
public class MemoryThreadValidator implements Validator {
47

    
48
	private CacheManager cacheManager;
49
	
50
	private ProvidersManager providersManager;
51
	
52
	private ListenersManager listenersManager;
53
	 
54
	private transient Logger log = Logger.getLogger(MemoryThreadValidator.class);
55

    
56
	private final ExecutorService executor;
57

    
58
	private final Executor taskExecutor;
59

    
60
	private final long timeout, generalTimeout;
61
	
62
	private Boolean dnetWorkflow;
63

    
64
	private MemoryRegistry<Rule> rules;	
65

    
66
	/**
67
	 * Creates a new validator.
68
	 * 
69
	 * @param executor
70
	 *            The module used to run the providers and submit jobs.
71
	 * @param taskExecutor
72
	 *            The executor used to execute the jobs.
73
	 * @param timeout
74
	 *            How long to wait for a task to be executed (in seconds). A
75
	 *            task is composed of a single Validation Object applied on all
76
	 *            the rules of the job.
77
	 * @param generalTimeout
78
	 *            How long to wait for a job to be executed (in seconds). A job
79
	 *            is composed of the application of all the rules on all of the
80
	 *            Validation Objects.
81
	 */
82
	public MemoryThreadValidator(ExecutorService executor, Executor taskExecutor, long timeout, long generalTimeout) {
83
		super();
84
		log.info("Creating a new Validator");
85
		this.executor = executor;
86
		this.taskExecutor = taskExecutor;
87
		this.timeout = timeout;
88
		this.generalTimeout = generalTimeout;
89
		rules = new MemoryRegistry<Rule>(RegistryType.rules);
90
	}
91
	
92
	public void init() {
93

    
94
	}
95

    
96
	@Override
97
	public <T extends Serializable> void addToRegistry(int objid, T obj, String registryName) {
98
		if (registryName.equals(RegistryType.rules)) {
99
			Rule rule = (Rule) obj;
100
			log.debug("Adding to registry rule " + rule.getId());
101
			rules.addObject(rule.getId(), rule);
102
		} else
103
			throw new UnsupportedOperationException("You may not add an object to a non-existing registry");
104
	}
105
	
106
	@Override
107
	public Serializable getFromRegistry(int objid, String registryName) throws ValidatorException {
108
		if (registryName.equals(RegistryType.rules)) {
109
			return rules.getObject(objid);
110
		} else
111
			throw new ValidatorException("The registry "+registryName+" does not exist");
112
	}
113

    
114
	@Override
115
	public <T extends Serializable> void addRegistry(String name) {
116
		throw new UnsupportedOperationException("You may not add new registries to this Validator implementation");
117
	}
118

    
119
	@Override
120
	public void submitJob(Job job, int workers, JobListener... listeners) throws ValidatorException {
121
		log.debug("Submitting job " + job.id);
122
		List<Provider> providers = new ArrayList<Provider>();
123

    
124
		try {
125
			log.debug("Creating a new provider instance");
126
			for (int i = 0 ; i<workers ; i++) {
127
				Provider prv = null;
128
				if (job.providerId == 1)
129
					prv = providersManager.createOaipmhRecordProvider();
130
				else if (job.providerId == 2)
131
					prv = providersManager.createOaipmhSinglePageVerbProvider();
132
				else if (job.providerId == 3)
133
					prv = providersManager.createDnetProvider();
134
				providers.add(prv);
135
			}
136
		} catch (Exception e) {
137
			log.error("Error creating provider instance", e);
138
		}
139
		ValidatorJobMainListener mainListener = new ValidatorJobMainListener();
140
//		mainListener.setWorkers(workers);
141
		Integer workerId=0;
142
		for (Provider prov : providers) {
143
			Properties props = job.providerProps;
144
			if (job.providerProps.getProperty(DnetProvider.MDSTORE_ID )!= null) {
145
				props = new Properties();
146
				props.setProperty(DnetProvider.WORKER_ID, Integer.toString(workerId));
147
				props.setProperty(DnetProvider.WORKERS, Integer.toString(workers));
148
				props.setProperty(DnetProvider.MDSTORE_ID, job.providerProps.getProperty(DnetProvider.MDSTORE_ID));
149
				props.setProperty(DnetProvider.BATCH_SIZE, job.providerProps.getProperty(DnetProvider.BATCH_SIZE));
150
				props.setProperty(DnetProvider.RECORDS, job.providerProps.getProperty(DnetProvider.RECORDS));
151
			}
152
			workerId++;
153
			prov.setConfiguration(props);
154
			String set = props.getProperty(OAIPMHRecordProvider.SET);
155
			if (set == null)
156
				set = "none";
157
			ExecutorSubmitter submitter = new ThreadExecutorSubmitter(job.id, job.rules, set, prov, mainListener, taskExecutor, timeout, generalTimeout, listeners);
158
			executor.submit(submitter);
159
		}  
160
	}
161
	
162
	@Override
163
	public void shutdown() throws ValidatorException {
164
		log.info("Shutting down validator");
165
		executor.shutdownNow();
166
		this.rules = null;
167
		
168
	}
169

    
170
	@Override
171
	public void submitJobForCris(Job job, Map<String, Set<Rule>> rulesPerEntity, Map<String, Set<Rule>> rulesPerEntityRef, JobListener... listeners)
172
			throws ValidatorException {
173
		
174
		CrisProvider prv = providersManager.createCrisProvider();
175
		
176
		log.debug("creating cris  - timeout: " + prv.getTimeout());
177
		String cacheName = UUID.randomUUID().toString();
178
		
179
		Properties props = job.providerProps;
180
		
181
		if (!rulesPerEntityRef.isEmpty()) {
182
			cacheManager.addCache(new Cache(cacheName, 20000, true, true, 10000, 10000));
183
			log.debug("caches: ");
184
			for (String cache : cacheManager.getCacheNames()) {
185
				log.debug("name: " + cache);
186
			}
187
			prv.setCache(cacheManager.getCache(cacheName));
188
		}
189
		
190
		
191
		prv.setEntities(rulesPerEntity.keySet());
192
		prv.setBaseUrl(props.getProperty(OAIPMHRecordProvider.BASEURL));
193
		prv.setMetadataPrefix(props.getProperty(OAIPMHRecordProvider.METADATA_PREFIX));
194
		prv.setRecords(props.getProperty(OAIPMHRecordProvider.RECORDS));
195
		
196
		ValidatorJobMainListener mainListener = new ValidatorJobMainListener();
197
//		mainListener.setWorkers(rulesPerEntity.keySet().size() + rulesPerEntityRef.keySet().size());
198
		
199
		CrisListener crisListener = listenersManager.createCrisListener();
200
		crisListener.setWorkersFirstPhase(rulesPerEntity.keySet().size());
201
//TODO remove as soon as spring validator works ok
202
//		crisListener.setExecutor(executor);
203
		crisListener.setProvider(prv);
204
		crisListener.setWorkers(rulesPerEntity.keySet().size() + rulesPerEntityRef.keySet().size());
205
		if (!rulesPerEntityRef.isEmpty()) {
206
			crisListener.setCacheManager(cacheManager);
207
			crisListener.setCacheName(cacheName);
208
		}
209
		
210
		mainListener.setCrisListener(crisListener);
211
		for (Map.Entry<String, Set<Rule>> entry : rulesPerEntity.entrySet()) {
212
			log.debug("Cris# set: "+ entry.getKey() + " - Rules size: " + entry.getValue().size());
213
			prv.setSet(entry.getKey());
214
			props.setProperty(OAIPMHRecordProvider.SET, entry.getKey());
215
			prv.setConfiguration(props);
216
			ThreadExecutorSubmitter submitter = new ThreadExecutorSubmitter(job.id, entry.getValue(), entry.getKey(), prv, mainListener, taskExecutor, timeout, generalTimeout, listeners);
217
			executor.submit(submitter);
218
		}
219
		
220
		
221
		log.error("Creating Sumbitters for referential check..");
222
		List<ThreadExecutorSubmitter> submittersForReferential = new ArrayList<ThreadExecutorSubmitter>();
223
		for (Map.Entry<String, Set<Rule>> entry : rulesPerEntityRef.entrySet()) {
224
			log.error("Ref Submitter set: " + entry.getKey() + " and rules: " + entry.getValue() + " and new rule ids: " + entry.getValue().size());
225
//			logger.debug("Cris# set: "+ entry.getKey() + " - Rules: " + entry.getValue());
226
			prv.setSet(entry.getKey());
227
			props.setProperty(OAIPMHRecordProvider.SET, entry.getKey());
228
			prv.setConfiguration(props);
229
			submittersForReferential.add(new ThreadExecutorSubmitter(job.id, entry.getValue(), entry.getKey(), prv, mainListener, taskExecutor, timeout, generalTimeout, listeners));
230
		}
231
		//TODO remove as soon as spring validator works ok
232
//		crisListener.setSubmittersForReferential(submittersForReferential);
233
		log.error("Sumbitters created for referential check: " + submittersForReferential.size());
234
		
235
	}
236
	
237
	@Override
238
	public void start() throws ValidatorException {
239
		log.debug("Starting validator");
240
	}
241

    
242
	/**
243
	 * Contains the names of the 3 containers (registries) used by this
244
	 * validator implementation.
245
	 * 
246
	 * @author Manos Karvounis
247
	 * @author Nikon Gasparis
248
	 * 
249
	 * @see Validator#addToRegistry(int, Serializable, String)
250
	 */
251
	
252
	public interface RegistryType {
253
		public static final String rules = "RULES";
254
	}
255

    
256
	private class ValidatorJobMainListener implements JobListener {
257
		
258
		private CrisListener crisListener = null;
259
		
260
		public void setCrisListener(CrisListener crisListener) {
261
			this.crisListener = crisListener;
262
		}
263

    
264
		@Override
265
		public void currentResults(List<CompletedTask> tasks, int jobId,
266
				Object object, Map<String, Object> recordContext, Throwable t) {
267
			// TODO Auto-generated method stub
268
			
269
		}
270

    
271
		@Override
272
		public void currentResults(List<CompletedTask> tasks, int jobId,
273
				Object object, Map<String, Object> recordContext) {
274
			// TODO Auto-generated method stub
275
			
276
		}
277

    
278
		@Override
279
		public synchronized void finished(int jobId, Map<String, Object> jobContext) {
280
			if (crisListener != null)
281
				crisListener.finished(jobId, jobContext);
282
			log.debug("Job " + jobId + " has finished, removing it from the registry");
283
		}
284

    
285
		@Override
286
		public synchronized void failed(int jobId, Map<String, Object> jobContext,
287
				Throwable t) {
288
			if (crisListener != null)
289
				crisListener.failed(jobId, jobContext, t);
290
			log.debug("A job has finished-failed, removing it from the registry");
291
		}
292

    
293
	}
294

    
295
	public Boolean getDnetWorkflow() {
296
		return dnetWorkflow;
297
	}
298

    
299
	public void setDnetWorkflow(Boolean dnetWorkflow) {
300
		this.dnetWorkflow = dnetWorkflow;
301
	}
302

    
303
	public ProvidersManager getProvidersManager() {
304
		return providersManager;
305
	}
306

    
307
	public void setProvidersManager(ProvidersManager providersManager) {
308
		this.providersManager = providersManager;
309
	}
310

    
311
	public CacheManager getCacheManager() {
312
		return cacheManager;
313
	}
314

    
315
	public void setCacheManager(CacheManager cacheManager) {
316
		this.cacheManager = cacheManager;
317
	}
318

    
319
	public ListenersManager getListenersManager() {
320
		return listenersManager;
321
	}
322

    
323
	public void setListenersManager(ListenersManager listenersManager) {
324
		this.listenersManager = listenersManager;
325
	}
326

    
327

    
328
}
(1-1/3)