Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.io.BufferedWriter;
4
import java.io.File;
5
import java.io.FileWriter;
6
import java.io.IOException;
7
import java.util.ArrayList;
8
import java.util.Calendar;
9
import java.util.List;
10
import java.util.Map;
11
import java.util.concurrent.BlockingQueue;
12
import java.util.concurrent.LinkedBlockingQueue;
13
import java.util.concurrent.atomic.AtomicInteger;
14

    
15
import eu.dnetlib.domain.functionality.validator.StoredJob;
16
import eu.dnetlib.validator.service.impl.ValidatorManager;
17
import org.apache.log4j.Logger;
18
import org.springframework.core.task.TaskExecutor;
19

    
20
import eu.dnetlib.api.enabling.ResultSetService;
21
import eu.dnetlib.api.enabling.ResultSetServiceException;
22
import eu.dnetlib.domain.EPR;
23
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
24
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
25
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
26
import eu.dnetlib.utils.EPRUtils;
27
import eu.dnetlib.validator.engine.ValidatorException;
28
import eu.dnetlib.validator.engine.execution.CompletedTask;
29
import eu.dnetlib.validator.engine.execution.JobListener;
30
import gr.uoa.di.driver.util.ServiceLocator;
31

    
32
import javax.annotation.PostConstruct;
33

    
34
public class DnetListener implements JobListener {
35

    
36
    private static Logger logger = Logger.getLogger(DnetListener.class);
37
    private final AtomicInteger activeThreads = new AtomicInteger(0);
38
    private Object allThreadsFinished = new Object();
39
    private RecordXMLBuilder xmlBuilder;
40
    private BlackboardJob job;
41
    private BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler;
42
    private TaskExecutor rsExecutor;
43
    private ServiceLocator<ResultSetService> resultSetServiceLocator = null;
44
    private ResultSetService resultSetService;
45
    private String outputResultSetID;
46
    private List<String> outputDiskBuffer;
47
    private EPR outputEpr;
48
    private int internalJobsFinished = 0;
49
    private int internalJobsSum = 1;
50
    private int objsValidated = 0;
51
    private int jobStatusUpdateInterval;
52
    private boolean enableOutputToRS = false;
53
    private boolean enableOutputToDisk = false;
54
    private BufferedWriter bw = null;
55
    private BlockingQueue<String> queue;
56
    private int validationJobId;
57

    
58
    private ValidatorManager validatorManager;
59
    private String groupBy;
60

    
61
    public void initOutputs() {
62
        logger.debug("initializing outputs");
63
        if (enableOutputToRS) {
64
            this.initEprOutput();
65
            job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr));
66

    
67
            logger.debug("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
68
        }
69

    
70
        if (enableOutputToDisk) {
71
            this.initDiskOutput();
72
            logger.debug("disk output ok");
73
        } else {
74
            logger.debug("initializing disk disabled");
75
        }
76
    }
77

    
78
    @Override
79
    public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext, Throwable t) throws ValidatorException {
80
        try {
81
            blackboardHandler.getBlackboardHandler().failed(job, t);
82
        } catch (Exception e) {
83
            logger.error("Error while proccessing record results");
84
            throw new ValidatorException("Error while proccessing record results", e);
85
        }
86
    }
87

    
88
    @Override
89
    public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext) throws ValidatorException {
90
        try {
91
            if (enableOutputToRS) {
92
                @SuppressWarnings("unchecked")
93
                String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"), record, (Map<String, String>) recordContext.get("recordValidationResult"));
94
                this.sendToQueue(xmlString);
95
            }
96

    
97
            if (enableOutputToDisk) {
98
                if ((Integer) recordContext.get("recordBlacklistScore") < 100) {
99
                    bw.write(tasks.get(0).valobjId);
100
                    bw.newLine();
101
                }
102

    
103
//				outputDiskBuffer.add(tasks.get(0).valobjId);
104
//				if (outputDiskBuffer.size() % 1000 == 0) {
105
//					for (String id : outputDiskBuffer) {
106
//						bw.write(id);
107
//						bw.newLine();
108
//					}
109
//					outputDiskBuffer.clear();
110
//				}
111
            }
112

    
113
            objsValidated++;
114

    
115
            if (objsValidated % jobStatusUpdateInterval == 0) {
116
                job.getParameters().put("recordsTested", Integer.toString(objsValidated));
117
                blackboardHandler.getBlackboardHandler().ongoing(job);
118
            }
119
        } catch (Exception e) {
120
            logger.error("Error while proccessing record results");
121
            throw new ValidatorException("Error while proccessing record results", e);
122
        }
123
    }
124

    
125
    @Override
126
    public synchronized void finished(int jobId, Map<String, Object> jobContext) {
127
        try {
128

    
129
            logger.debug("Job " + jobId + " finished");
130

    
131
            internalJobsFinished++;
132
            if (internalJobsFinished == internalJobsSum) {
133

    
134
                logger.debug("internalJobsFinished == internalJobsSum");
135
                if (enableOutputToRS) {
136
                    // send remaining xmls from buffers
137
//					if (outputRSBuffer.size() > 0) {
138
//						try {
139
//							resultSetService.populateRS(outputResultSetID,
140
//									outputRSBuffer);
141
//							outputRSBuffer.clear();
142
//						} catch (ResultSetServiceException e) {
143
//							logger.error("Error populating ResultSetService.",
144
//									e);
145
//						}
146
//					}
147
                    try {
148
                        queue.put("finished");
149

    
150
                    } catch (InterruptedException e) {
151
                        logger.error("Error finalizing queue", e);
152
                    }
153

    
154
                    if (queue.size() > 0) {
155
                        long time1 = Calendar.getInstance().getTimeInMillis();
156
                        activeThreads.getAndIncrement();
157
                        RSTask task = new RSTask(resultSetService, outputResultSetID, queue, activeThreads, allThreadsFinished);
158
                        rsExecutor.execute(task);
159
                        long time2 = Calendar.getInstance().getTimeInMillis();
160
                        logger.debug("Populating RS took " + ((time2 - time1))
161
                                + " milli seconds");
162
                    }
163

    
164
                    // finally, close rs.
165
                    logger.debug("active threads to finish: " + activeThreads.get());
166
                    logger.debug("trying to close result set");
167
                    while (activeThreads.get() > 0) {
168
                        logger.debug("waiting active threads to finish. Remaining: " + activeThreads.get());
169
                        synchronized (allThreadsFinished) {
170
                            allThreadsFinished.wait();
171
                        }
172
                        logger.debug("retrying to finish. Remaining: " + activeThreads.get());
173
                    }
174
                    logger.debug("closing result set");
175
                    resultSetService.closeRS(outputResultSetID);
176
                }
177
                if (enableOutputToDisk) {
178
                    bw.write("FINISHED");
179
//					bw.newLine();
180
//					logger.debug("output disk buffer size: " + outputDiskBuffer.size());
181
//					if (outputDiskBuffer.size() > 0) {
182
//						for (String id : outputDiskBuffer) {
183
//							bw.write(id);
184
//							bw.newLine();
185
//						}
186
//						outputDiskBuffer.clear();
187
//					}
188
                    bw.close();
189
                }
190
                // job.getParameters().put("outputResultSetEpr",
191
                // EPRUtils.eprToXml(outputEpr));
192
                job.getParameters().put("jobId", Integer.toString(validationJobId));
193

    
194
                job.getParameters().put("recordsTested", Integer.toString(objsValidated));
195
//				 job.getParameters().put(
196
//				 "recordsInResultSet",
197
//				 Integer.toString(resultSetService
198
//				 .getNumberOfElements(outputResultSetID)));
199
                // update job status
200

    
201
                logger.info("Getting stored job");
202
                StoredJob storedJob = validatorManager.getStoredJob(jobId, this.groupBy);
203
                logger.info("Score: " + storedJob.getContentJobScore());
204
                job.getParameters().put("score", storedJob.getContentJobScore() + "");
205

    
206
                blackboardHandler.getBlackboardHandler().done(job);
207

    
208
            } else {
209
                logger.debug("Waiting "
210
                        + (internalJobsSum - internalJobsFinished)
211
                        + " job(s) to finish");
212
            }
213
        } catch (Exception e) {
214
            logger.error("Error while finalizing successfull workflow job", e);
215
        }
216
    }
217

    
218
    @Override
219
    public synchronized void failed(int jobId, Map<String, Object> jobContext,
220
                                    Throwable t) {
221
        try {
222
            internalJobsFinished++;
223
            if (internalJobsFinished == internalJobsSum) {
224
                if (enableOutputToRS) {
225
                    resultSetService.closeRS(outputResultSetID);
226
                }
227
                if (enableOutputToDisk) {
228
                    outputDiskBuffer.clear();
229
                    bw.close();
230
                    File file = new File("/tmp/validator-wf/" + validationJobId);
231
//					File file = new File("/tmp/validator-wf/"
232
//							+ job.getParameters().get("datasourceId"));
233
                    if (file.exists()) {
234
                        file.delete();
235
                    }
236
                }
237
                job.getParameters().put("jobId",
238
                        Integer.toString(validationJobId));
239

    
240
                // update job status
241
                blackboardHandler.getBlackboardHandler().failed(job, t);
242
            } else {
243
                logger.debug("Waiting "
244
                        + (internalJobsSum - internalJobsFinished)
245
                        + " job(s) to finish");
246
            }
247
        } catch (Exception e) {
248
            logger.error("Error while finalizing failed workflow job", e);
249
        }
250
    }
251

    
252
    private void sendToQueue(String xmlString) {
253
        logger.debug("received passed XMLresult");
254

    
255
        try {
256
            queue.put(xmlString);
257
        } catch (InterruptedException e1) {
258
            logger.error("Error putting in queue.", e1);
259
        }
260
        if (queue.size() > 50) {
261
            long time1 = Calendar.getInstance().getTimeInMillis();
262
            RSTask task = new RSTask(resultSetService, outputResultSetID, queue, activeThreads, allThreadsFinished);
263
//			synchronized (counterLock) {
264
//				activeThreads ++;
265
//			}
266
            activeThreads.getAndIncrement();
267
            rsExecutor.execute(task);
268
            long time2 = Calendar.getInstance().getTimeInMillis();
269
            logger.debug("Populating RS took " + ((time2 - time1))
270
                    + " milli seconds");
271
        }
272
    }
273

    
274
    private void initEprOutput() {
275
        try {
276
            logger.debug("Initializing ResultSetService.");
277
            // Get reference to service
278
            resultSetService = resultSetServiceLocator.getService();
279
            // Create a new result set
280
            outputEpr = resultSetService.createPushRS(86400, 0);
281
            // get result set ids
282
            outputResultSetID = outputEpr.getParameter("ResourceIdentifier");
283

    
284
            // initializing buffers
285
//            outputRSBuffer = new ArrayList<String>();
286

    
287
            queue = new LinkedBlockingQueue<String>();
288

    
289
        } catch (Exception e) {
290
            logger.error("Error initializing ResultSetService.", e);
291
            blackboardHandler.getBlackboardHandler().failed(job, e);
292
        }
293
    }
294

    
295
    private void initDiskOutput() {
296
        try {
297

    
298
            logger.debug("Initializing FileOutputStream.");
299
//			File file = new File("/var/lib/dnet/validator/workflow_blacklists/"
300
//					+ validationJobId);
301
            String datasourceId;
302
            if (job.getParameters().get("datasourceId") != null) {
303
                datasourceId = job.getParameters().get("datasourceId");
304
            } else if (job.getParameters().get("datasourceID") != null) {
305
                datasourceId = job.getParameters().get("datasourceID");
306
            } else {
307
                datasourceId = "unknownId";
308
            }
309

    
310
            File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
311
            logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
312
            if (file.exists()) {
313
                logger.debug("File will be replaced");
314
                file.delete();
315
            }
316
            file.createNewFile();
317

    
318
            bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
319

    
320
//			GZIPOutputStream zip = new GZIPOutputStream(new FileOutputStream(file.getAbsoluteFile()));
321

    
322
//			bw = new BufferedWriter(new OutputStreamWriter(zip, "UTF-8"));
323

    
324
            outputDiskBuffer = new ArrayList<String>();
325

    
326
        } catch (IOException e) {
327
            logger.error("Error initializing FileOutputStream.", e);
328
            blackboardHandler.getBlackboardHandler().failed(job, e);
329

    
330
        }
331
    }
332

    
333
    public RecordXMLBuilder getXmlBuilder() {
334
        return xmlBuilder;
335
    }
336

    
337
    public void setXmlBuilder(RecordXMLBuilder xmlBuilder) {
338
        this.xmlBuilder = xmlBuilder;
339
    }
340

    
341
    public BlackboardJob getJob() {
342
        return job;
343
    }
344

    
345
    public void setJob(BlackboardJob job) {
346
        this.job = job;
347
    }
348

    
349
    public ServiceLocator<ResultSetService> getResultSetServiceLocator() {
350
        return resultSetServiceLocator;
351
    }
352

    
353
    public void setResultSetServiceLocator(
354
            ServiceLocator<ResultSetService> resultSetServiceLocator) {
355
        this.resultSetServiceLocator = resultSetServiceLocator;
356
    }
357

    
358
    public BlackboardNotificationHandler<BlackboardServerHandler> getBlackboardHandler() {
359
        return blackboardHandler;
360
    }
361

    
362
    public void setBlackboardHandler(
363
            BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler) {
364
        this.blackboardHandler = blackboardHandler;
365
    }
366

    
367
    public TaskExecutor getRsExecutor() {
368
        return rsExecutor;
369
    }
370

    
371
    public void setRsExecutor(TaskExecutor rsExecutor) {
372
        this.rsExecutor = rsExecutor;
373
    }
374

    
375
    public int getInternalJobsSum() {
376
        return internalJobsSum;
377
    }
378

    
379
    public void setInternalJobsSum(int internalJobsSum) {
380
        this.internalJobsSum = internalJobsSum;
381
    }
382

    
383
    public int getValidationJobId() {
384
        return validationJobId;
385
    }
386

    
387
    public void setValidationJobId(int validationJobId) {
388
        this.validationJobId = validationJobId;
389
    }
390

    
391
    public int getJobStatusUpdateInterval() {
392
        return jobStatusUpdateInterval;
393
    }
394

    
395
    public void setJobStatusUpdateInterval(int jobStatusUpdateInterval) {
396
        this.jobStatusUpdateInterval = jobStatusUpdateInterval;
397
    }
398

    
399
    public boolean isEnableOutputToDisk() {
400
        return enableOutputToDisk;
401
    }
402

    
403
    public void setEnableOutputToDisk(boolean enableOutputToDisk) {
404
        this.enableOutputToDisk = enableOutputToDisk;
405
    }
406

    
407
    public boolean isEnableOutputToRS() {
408
        return enableOutputToRS;
409
    }
410

    
411
    public void setEnableOutputToRS(boolean enableOutputToRS) {
412
        this.enableOutputToRS = enableOutputToRS;
413
    }
414

    
415
    public ValidatorManager getValidatorManager() {
416
        return validatorManager;
417
    }
418

    
419
    public void setValidatorManager(ValidatorManager validatorManager) {
420
        this.validatorManager = validatorManager;
421
    }
422

    
423
    public String getGroupBy() {
424
        return groupBy;
425
    }
426

    
427
    public void setGroupBy(String groupBy) {
428
        this.groupBy = groupBy;
429
    }
430
}
(3-3/8)