1 |
38323
|
nikon.gasp
|
package eu.dnetlib.validator.service.impls;
|
2 |
|
|
|
3 |
|
|
import java.io.Serializable;
|
4 |
|
|
import java.util.ArrayList;
|
5 |
|
|
import java.util.List;
|
6 |
|
|
import java.util.Map;
|
7 |
|
|
import java.util.Properties;
|
8 |
|
|
import java.util.Set;
|
9 |
|
|
import java.util.UUID;
|
10 |
|
|
|
11 |
|
|
import net.sf.ehcache.Cache;
|
12 |
|
|
import net.sf.ehcache.CacheManager;
|
13 |
|
|
|
14 |
|
|
import org.apache.log4j.Logger;
|
15 |
|
|
import org.quartz.spi.ThreadPool;
|
16 |
|
|
import org.springframework.core.task.SyncTaskExecutor;
|
17 |
|
|
import org.springframework.core.task.TaskExecutor;
|
18 |
|
|
|
19 |
|
|
import eu.dnetlib.validator.engine.Validator;
|
20 |
|
|
import eu.dnetlib.validator.engine.ValidatorException;
|
21 |
|
|
import eu.dnetlib.validator.engine.data.Provider;
|
22 |
|
|
import eu.dnetlib.validator.engine.data.Rule;
|
23 |
|
|
import eu.dnetlib.validator.engine.execution.CompletedTask;
|
24 |
|
|
import eu.dnetlib.validator.engine.execution.Job;
|
25 |
|
|
import eu.dnetlib.validator.engine.execution.JobListener;
|
26 |
|
|
import eu.dnetlib.validator.service.impls.executors.JobWorker;
|
27 |
|
|
import eu.dnetlib.validator.service.impls.listeners.CrisListener;
|
28 |
|
|
import eu.dnetlib.validator.service.impls.listeners.ListenersManager;
|
29 |
|
|
import eu.dnetlib.validator.service.impls.providers.CrisProvider;
|
30 |
|
|
import eu.dnetlib.validator.service.impls.providers.DnetProviderNew;
|
31 |
|
|
import eu.dnetlib.validator.service.impls.providers.OAIPMHRecordProvider;
|
32 |
|
|
import eu.dnetlib.validator.service.impls.providers.ProvidersManager;
|
33 |
|
|
|
34 |
|
|
/**
|
35 |
|
|
* A simple validator that stores rules, providers, and executing/pending jobs
|
36 |
|
|
* in memory. It uses an SyncTaskExecutor to submit and execute jobs.
|
37 |
|
|
*
|
38 |
|
|
* @author Nikon Gasparis
|
39 |
|
|
* @see ThreadPool
|
40 |
|
|
*/
|
41 |
|
|
public class SpringValidator implements Validator {
|
42 |
|
|
|
43 |
|
|
private CacheManager cacheManager;
|
44 |
|
|
|
45 |
|
|
private ProvidersManager providersManager;
|
46 |
|
|
|
47 |
|
|
private ListenersManager listenersManager;
|
48 |
|
|
|
49 |
|
|
private transient Logger log = Logger.getLogger(SpringValidator.class);
|
50 |
|
|
|
51 |
|
|
private final TaskExecutor jobExecutor;
|
52 |
|
|
|
53 |
|
|
private final SyncTaskExecutor taskExecutor;
|
54 |
|
|
|
55 |
|
|
private final long generalTimeout;
|
56 |
|
|
|
57 |
|
|
private Boolean dnetWorkflow;
|
58 |
|
|
|
59 |
|
|
/**
|
60 |
|
|
* Creates a new validator.
|
61 |
|
|
*
|
62 |
41626
|
nikon.gasp
|
* @param jobExecutor
|
63 |
38323
|
nikon.gasp
|
* The module used to run the providers and submit jobs.
|
64 |
|
|
* @param taskExecutor
|
65 |
|
|
* The executor used to execute the jobs.
|
66 |
|
|
* @param generalTimeout
|
67 |
|
|
* How long to wait for a job to be executed (in seconds). A job
|
68 |
|
|
* is composed of the application of all the rules on all of the
|
69 |
|
|
* Validation Objects.
|
70 |
|
|
*/
|
71 |
|
|
|
72 |
|
|
public SpringValidator(TaskExecutor jobExecutor, SyncTaskExecutor taskExecutor, long generalTimeout) {
|
73 |
|
|
super();
|
74 |
|
|
log.info("Creating a new Validator");
|
75 |
|
|
this.generalTimeout = generalTimeout;
|
76 |
|
|
this.taskExecutor = taskExecutor;
|
77 |
|
|
this.jobExecutor = jobExecutor;
|
78 |
|
|
}
|
79 |
|
|
|
80 |
|
|
@Override
|
81 |
|
|
public <T extends Serializable> void addToRegistry(int objid, T obj,
|
82 |
|
|
String registryName) {
|
83 |
|
|
throw new UnsupportedOperationException(
|
84 |
|
|
"You may not add new registries to this Validator implementation");
|
85 |
|
|
}
|
86 |
|
|
|
87 |
|
|
@Override
|
88 |
|
|
public Serializable getFromRegistry(int objid, String registryName)
|
89 |
|
|
throws ValidatorException {
|
90 |
|
|
throw new UnsupportedOperationException(
|
91 |
|
|
"You may not add new registries to this Validator implementation");
|
92 |
|
|
}
|
93 |
|
|
|
94 |
|
|
@Override
|
95 |
|
|
public <T extends Serializable> void addRegistry(String name) {
|
96 |
|
|
throw new UnsupportedOperationException(
|
97 |
|
|
"You may not add new registries to this Validator implementation");
|
98 |
|
|
}
|
99 |
|
|
|
100 |
|
|
@Override
|
101 |
|
|
public void submitJob(Job job, int workers, JobListener... listeners)
|
102 |
|
|
throws ValidatorException {
|
103 |
|
|
log.debug("Submitting job " + job.id);
|
104 |
|
|
List<Provider> providers = new ArrayList<Provider>();
|
105 |
|
|
|
106 |
|
|
try {
|
107 |
|
|
log.debug("Creating a new provider instance");
|
108 |
|
|
for (int i = 0; i < workers; i++) {
|
109 |
|
|
Provider prv = null;
|
110 |
|
|
if (job.providerId == 1)
|
111 |
|
|
prv = providersManager.createOaipmhRecordProvider();
|
112 |
|
|
else if (job.providerId == 2)
|
113 |
|
|
prv = providersManager.createOaipmhSinglePageVerbProvider();
|
114 |
|
|
else if (job.providerId == 3)
|
115 |
|
|
prv = providersManager.createDnetProvider();
|
116 |
|
|
providers.add(prv);
|
117 |
|
|
}
|
118 |
|
|
} catch (Exception e) {
|
119 |
|
|
log.error("Error creating provider instance", e);
|
120 |
|
|
}
|
121 |
|
|
ValidatorJobMainListener mainListener = new ValidatorJobMainListener();
|
122 |
|
|
// mainListener.setWorkers(workers);
|
123 |
|
|
Integer workerId = 0;
|
124 |
|
|
for (Provider prov : providers) {
|
125 |
|
|
Properties props = job.providerProps;
|
126 |
|
|
if (job.providerProps.getProperty(DnetProviderNew.MDSTORE_ID) != null) {
|
127 |
|
|
props = new Properties();
|
128 |
|
|
props.setProperty(DnetProviderNew.WORKER_ID,
|
129 |
|
|
Integer.toString(workerId));
|
130 |
|
|
props.setProperty(DnetProviderNew.WORKERS,
|
131 |
|
|
Integer.toString(workers));
|
132 |
|
|
props.setProperty(DnetProviderNew.MDSTORE_ID,
|
133 |
|
|
job.providerProps.getProperty(DnetProviderNew.MDSTORE_ID));
|
134 |
|
|
props.setProperty(DnetProviderNew.BATCH_SIZE,
|
135 |
|
|
job.providerProps.getProperty(DnetProviderNew.BATCH_SIZE));
|
136 |
|
|
props.setProperty(DnetProviderNew.RECORDS,
|
137 |
|
|
job.providerProps.getProperty(DnetProviderNew.RECORDS));
|
138 |
|
|
}
|
139 |
|
|
workerId++;
|
140 |
|
|
prov.setConfiguration(props);
|
141 |
|
|
String set = props.getProperty(OAIPMHRecordProvider.SET);
|
142 |
|
|
if (set == null)
|
143 |
|
|
set = "none";
|
144 |
|
|
JobWorker jobWorker = new JobWorker(job.id, job.rules, set, prov,
|
145 |
|
|
mainListener, taskExecutor, generalTimeout, listeners);
|
146 |
|
|
jobExecutor.execute(jobWorker);
|
147 |
|
|
}
|
148 |
|
|
}
|
149 |
|
|
|
150 |
|
|
@Override
|
151 |
|
|
public void submitJobForCris(Job job,
|
152 |
|
|
Map<String, Set<Rule>> rulesPerEntity,
|
153 |
|
|
Map<String, Set<Rule>> rulesPerEntityRef, JobListener... listeners)
|
154 |
|
|
throws ValidatorException {
|
155 |
|
|
|
156 |
|
|
CrisProvider prv = providersManager.createCrisProvider();
|
157 |
|
|
|
158 |
|
|
log.debug("creating cris - timeout: " + prv.getTimeout());
|
159 |
|
|
String cacheName = UUID.randomUUID().toString();
|
160 |
|
|
|
161 |
|
|
Properties props = job.providerProps;
|
162 |
|
|
|
163 |
|
|
if (!rulesPerEntityRef.isEmpty()) {
|
164 |
|
|
cacheManager.addCache(new Cache(cacheName, 20000, true, true,
|
165 |
|
|
10000, 10000));
|
166 |
|
|
log.debug("caches: ");
|
167 |
|
|
for (String cache : cacheManager.getCacheNames()) {
|
168 |
|
|
log.debug("name: " + cache);
|
169 |
|
|
}
|
170 |
|
|
prv.setCache(cacheManager.getCache(cacheName));
|
171 |
|
|
}
|
172 |
|
|
|
173 |
|
|
prv.setEntities(rulesPerEntity.keySet());
|
174 |
|
|
prv.setBaseUrl(props.getProperty(OAIPMHRecordProvider.BASEURL));
|
175 |
|
|
prv.setMetadataPrefix(props
|
176 |
|
|
.getProperty(OAIPMHRecordProvider.METADATA_PREFIX));
|
177 |
|
|
prv.setRecords(props.getProperty(OAIPMHRecordProvider.RECORDS));
|
178 |
|
|
|
179 |
|
|
ValidatorJobMainListener mainListener = new ValidatorJobMainListener();
|
180 |
|
|
// mainListener.setWorkers(rulesPerEntity.keySet().size() +
|
181 |
|
|
// rulesPerEntityRef.keySet().size());
|
182 |
|
|
|
183 |
|
|
CrisListener crisListener = listenersManager.createCrisListener();
|
184 |
|
|
crisListener.setWorkersFirstPhase(rulesPerEntity.keySet().size());
|
185 |
|
|
|
186 |
|
|
|
187 |
|
|
//TODO UPDATE LISTENER
|
188 |
41626
|
nikon.gasp
|
|
189 |
|
|
crisListener.setExecutor(jobExecutor);
|
190 |
38323
|
nikon.gasp
|
// crisListener.setExecutor(taskExecutor);
|
191 |
|
|
crisListener.setProvider(prv);
|
192 |
|
|
crisListener.setWorkers(rulesPerEntity.keySet().size()
|
193 |
|
|
+ rulesPerEntityRef.keySet().size());
|
194 |
|
|
if (!rulesPerEntityRef.isEmpty()) {
|
195 |
|
|
crisListener.setCacheManager(cacheManager);
|
196 |
|
|
crisListener.setCacheName(cacheName);
|
197 |
|
|
}
|
198 |
|
|
|
199 |
|
|
mainListener.setCrisListener(crisListener);
|
200 |
|
|
for (Map.Entry<String, Set<Rule>> entry : rulesPerEntity.entrySet()) {
|
201 |
|
|
log.debug("Cris# set: " + entry.getKey() + " - Rules size: "
|
202 |
|
|
+ entry.getValue().size());
|
203 |
|
|
prv.setSet(entry.getKey());
|
204 |
|
|
props.setProperty(OAIPMHRecordProvider.SET, entry.getKey());
|
205 |
|
|
prv.setConfiguration(props);
|
206 |
|
|
JobWorker jobWorker = new JobWorker(job.id, entry.getValue(), entry.getKey(), prv,
|
207 |
|
|
mainListener, taskExecutor, generalTimeout,
|
208 |
|
|
listeners);
|
209 |
|
|
jobExecutor.execute(jobWorker);
|
210 |
|
|
}
|
211 |
|
|
|
212 |
|
|
log.error("Creating Sumbitters for referential check..");
|
213 |
|
|
List<JobWorker> submittersForReferential = new ArrayList<JobWorker>();
|
214 |
|
|
for (Map.Entry<String, Set<Rule>> entry : rulesPerEntityRef.entrySet()) {
|
215 |
|
|
log.error("Ref Submitter set: " + entry.getKey() + " and rules: "
|
216 |
|
|
+ entry.getValue() + " and new rule ids: "
|
217 |
|
|
+ entry.getValue().size());
|
218 |
|
|
// logger.debug("Cris# set: "+ entry.getKey() + " - Rules: " +
|
219 |
|
|
// entry.getValue());
|
220 |
|
|
prv.setSet(entry.getKey());
|
221 |
|
|
props.setProperty(OAIPMHRecordProvider.SET, entry.getKey());
|
222 |
|
|
prv.setConfiguration(props);
|
223 |
|
|
submittersForReferential.add(new JobWorker(job.id,
|
224 |
|
|
entry.getValue(), entry.getKey(), prv, mainListener,
|
225 |
|
|
taskExecutor, generalTimeout, listeners));
|
226 |
|
|
}
|
227 |
|
|
//TODO UPDATE LISTENER
|
228 |
|
|
// crisListener.setSubmittersForReferential(submittersForReferential);
|
229 |
41626
|
nikon.gasp
|
crisListener.setSubmittersForReferential(submittersForReferential);
|
230 |
38323
|
nikon.gasp
|
log.error("Sumbitters created for referential check: "
|
231 |
|
|
+ submittersForReferential.size());
|
232 |
|
|
|
233 |
|
|
}
|
234 |
|
|
|
235 |
|
|
@Override
|
236 |
|
|
public void start() throws ValidatorException {
|
237 |
|
|
log.debug("Starting validator");
|
238 |
|
|
}
|
239 |
|
|
|
240 |
|
|
|
241 |
|
|
private class ValidatorJobMainListener implements JobListener {
|
242 |
|
|
|
243 |
|
|
private CrisListener crisListener = null;
|
244 |
|
|
|
245 |
|
|
public void setCrisListener(CrisListener crisListener) {
|
246 |
|
|
this.crisListener = crisListener;
|
247 |
|
|
}
|
248 |
|
|
|
249 |
|
|
@Override
|
250 |
|
|
public void currentResults(List<CompletedTask> tasks, int jobId,
|
251 |
|
|
Object object, Map<String, Object> recordContext, Throwable t) {
|
252 |
|
|
// TODO Auto-generated method stub
|
253 |
|
|
|
254 |
|
|
}
|
255 |
|
|
|
256 |
|
|
@Override
|
257 |
|
|
public void currentResults(List<CompletedTask> tasks, int jobId,
|
258 |
|
|
Object object, Map<String, Object> recordContext) {
|
259 |
|
|
// TODO Auto-generated method stub
|
260 |
|
|
|
261 |
|
|
}
|
262 |
|
|
|
263 |
|
|
@Override
|
264 |
|
|
public synchronized void finished(int jobId,
|
265 |
|
|
Map<String, Object> jobContext) {
|
266 |
|
|
if (crisListener != null)
|
267 |
|
|
crisListener.finished(jobId, jobContext);
|
268 |
|
|
log.debug("Job " + jobId
|
269 |
|
|
+ " has finished, removing it from the registry");
|
270 |
|
|
}
|
271 |
|
|
|
272 |
|
|
@Override
|
273 |
|
|
public synchronized void failed(int jobId,
|
274 |
|
|
Map<String, Object> jobContext, Throwable t) {
|
275 |
|
|
if (crisListener != null)
|
276 |
|
|
crisListener.failed(jobId, jobContext, t);
|
277 |
|
|
log.debug("A job has finished-failed, removing it from the registry");
|
278 |
|
|
}
|
279 |
|
|
|
280 |
|
|
}
|
281 |
|
|
|
282 |
|
|
public Boolean getDnetWorkflow() {
|
283 |
|
|
return dnetWorkflow;
|
284 |
|
|
}
|
285 |
|
|
|
286 |
|
|
public void setDnetWorkflow(Boolean dnetWorkflow) {
|
287 |
|
|
this.dnetWorkflow = dnetWorkflow;
|
288 |
|
|
}
|
289 |
|
|
|
290 |
|
|
public ProvidersManager getProvidersManager() {
|
291 |
|
|
return providersManager;
|
292 |
|
|
}
|
293 |
|
|
|
294 |
|
|
public void setProvidersManager(ProvidersManager providersManager) {
|
295 |
|
|
this.providersManager = providersManager;
|
296 |
|
|
}
|
297 |
|
|
|
298 |
|
|
public CacheManager getCacheManager() {
|
299 |
|
|
return cacheManager;
|
300 |
|
|
}
|
301 |
|
|
|
302 |
|
|
public void setCacheManager(CacheManager cacheManager) {
|
303 |
|
|
this.cacheManager = cacheManager;
|
304 |
|
|
}
|
305 |
|
|
|
306 |
|
|
public ListenersManager getListenersManager() {
|
307 |
|
|
return listenersManager;
|
308 |
|
|
}
|
309 |
|
|
|
310 |
|
|
public void setListenersManager(ListenersManager listenersManager) {
|
311 |
|
|
this.listenersManager = listenersManager;
|
312 |
|
|
}
|
313 |
|
|
|
314 |
|
|
@Override
|
315 |
|
|
public void shutdown() throws ValidatorException {
|
316 |
|
|
// TODO Auto-generated method stub
|
317 |
|
|
|
318 |
|
|
}
|
319 |
|
|
|
320 |
|
|
}
|