Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.io.BufferedWriter;
4
import java.io.File;
5
import java.io.FileWriter;
6
import java.io.IOException;
7
import java.util.ArrayList;
8
import java.util.Calendar;
9
import java.util.List;
10
import java.util.Map;
11
import java.util.concurrent.BlockingQueue;
12
import java.util.concurrent.LinkedBlockingQueue;
13
import java.util.concurrent.atomic.AtomicInteger;
14

    
15
import org.apache.log4j.Logger;
16
import org.springframework.core.task.TaskExecutor;
17

    
18
import eu.dnetlib.api.enabling.ResultSetService;
19
import eu.dnetlib.api.enabling.ResultSetServiceException;
20
import eu.dnetlib.domain.EPR;
21
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
22
import eu.dnetlib.enabling.tools.blackboard.BlackboardNotificationHandler;
23
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
24
import eu.dnetlib.utils.EPRUtils;
25
import eu.dnetlib.validator.engine.ValidatorException;
26
import eu.dnetlib.validator.engine.execution.CompletedTask;
27
import eu.dnetlib.validator.engine.execution.JobListener;
28
import gr.uoa.di.driver.util.ServiceLocator;
29

    
30
import javax.annotation.PostConstruct;
31

    
32
public class DnetListener implements JobListener {
33

    
34
    private static Logger logger = Logger.getLogger(DnetListener.class);
35
    private final AtomicInteger activeThreads = new AtomicInteger(0);
36
    public Object allThreadsFinished = new Object();
37
    private RecordXMLBuilder xmlBuilder;
38
    private BlackboardJob job;
39
    private BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler;
40
    private TaskExecutor rsExecutor;
41
    private ServiceLocator<ResultSetService> resultSetServiceLocator = null;
42
    private ResultSetService resultSetService;
43
    private String outputResultSetID;
44
    private List<String> outputRSBuffer, outputDiskBuffer;
45
    private EPR outputEpr;
46
    private int internalJobsFinished = 0;
47
    private int internalJobsSum = 1;
48
    private int objsValidated = 0;
49
    private int jobStatusUpdateInterval;
50
    private boolean eprSetOk = false;
51
    private boolean enableOutputToRS = false;
52
    private boolean enableOutputToDisk = false;
53
    private boolean diskSetOk = false;
54
    private BufferedWriter bw = null;
55
    private BlockingQueue<String> queue;
56
    private int validationJobId;
57

    
58
    @PostConstruct
59
    public void initOutputs() {
60
        logger.debug("initializing outputs");
61
        if (enableOutputToRS) {
62
            if (!eprSetOk) {
63
                this.initEprOutput();
64
                job.getParameters().put("outputResultSetEpr",
65
                        EPRUtils.eprToXml(outputEpr));
66
                eprSetOk = true;
67
                logger.debug("epr output ok");
68
                logger.debug("output epr id successfully set: "
69
                        + EPRUtils.eprToXml(outputEpr));
70
                logger.debug("current job status: " + job.getActionStatus());
71
            }
72
        }
73

    
74
        if (enableOutputToDisk) {
75
            logger.debug("initializing disk 0");
76
            if (!diskSetOk) {
77
                logger.debug("initializing disk 1");
78
                this.initDiskOutput();
79
                diskSetOk = true;
80
                logger.debug("disk output ok");
81
            }
82
        } else {
83
            logger.debug("initializing disk disabled");
84
        }
85
    }
86

    
87
    public void initEprOutput() {
88
        try {
89
            logger.debug("Initializing ResultSetService.");
90
            // Get reference to service
91
            resultSetService = resultSetServiceLocator.getService();
92
            // Create a new result set
93
            outputEpr = resultSetService.createPushRS(86400, 0);
94
            // get result set ids
95
            outputResultSetID = outputEpr.getParameter("ResourceIdentifier");
96

    
97
            // initializing buffers
98
            outputRSBuffer = new ArrayList<String>();
99

    
100
            queue = new LinkedBlockingQueue<String>();
101

    
102
        } catch (Exception e) {
103
            logger.error("Error initializing ResultSetService.", e);
104
            blackboardHandler.getBlackboardHandler().failed(job, e);
105
        }
106
    }
107

    
108
    public void initDiskOutput() {
109
        try {
110

    
111
            logger.debug("Initializing FileOutputStream.");
112
//			File file = new File("/var/lib/dnet/validator/workflow_blacklists/"
113
//					+ validationJobId);
114
            String datasourceId;
115
            if (job.getParameters().get("datasourceId") != null) {
116
                datasourceId = job.getParameters().get("datasourceId");
117
            } else if (job.getParameters().get("datasourceID") != null) {
118
                datasourceId = job.getParameters().get("datasourceID");
119
            } else {
120
                datasourceId = "unknownId";
121
            }
122

    
123
            File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
124
            logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
125
            if (file.exists()) {
126
                logger.debug("File will be replaced");
127
                file.delete();
128
            }
129
            file.createNewFile();
130

    
131
            bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
132

    
133
//			GZIPOutputStream zip = new GZIPOutputStream(new FileOutputStream(file.getAbsoluteFile()));
134

    
135
//			bw = new BufferedWriter(new OutputStreamWriter(zip, "UTF-8"));
136

    
137
            outputDiskBuffer = new ArrayList<String>();
138

    
139
        } catch (IOException e) {
140
            logger.error("Error initializing FileOutputStream.", e);
141
            blackboardHandler.getBlackboardHandler().failed(job, e);
142

    
143
        }
144
    }
145

    
146
    @Override
147
    public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext, Throwable t) throws ValidatorException {
148
        try {
149
            blackboardHandler.getBlackboardHandler().failed(job, t);
150
        } catch (Exception e) {
151
            logger.error("Error while proccessing record results");
152
            throw new ValidatorException("Error while proccessing record results", e);
153
        }
154
    }
155

    
156
    @Override
157
    public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext) throws ValidatorException {
158
        try {
159
            if (enableOutputToRS) {
160
                if (!eprSetOk) {
161
                    this.initEprOutput();
162
                    job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr));
163
                    eprSetOk = true;
164

    
165
                    logger.error("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
166
                    logger.error("current job status: " + job.getActionStatus());
167
                }
168

    
169
                long time1 = Calendar.getInstance().getTimeInMillis();
170
//                @SuppressWarnings("unchecked")
171
                String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"), record, (Map<String, String>) recordContext.get("recordValidationResult"));
172
                long time2 = Calendar.getInstance().getTimeInMillis();
173

    
174
                logger.debug("Building the xml took " + ((time2 - time1)) + " milli seconds");
175
                this.sendToQueue(xmlString);
176
                // logger.debug("XML: " + xmlString);
177
            }
178

    
179
            if (enableOutputToDisk) {
180
                if (!diskSetOk) {
181
                    this.initDiskOutput();
182
                    diskSetOk = true;
183
                }
184

    
185
                if ((Integer) recordContext.get("recordBlacklistScore") < 100) {
186
                    bw.write(tasks.get(0).valobjId);
187
                    bw.newLine();
188
                }
189

    
190
//				outputDiskBuffer.add(tasks.get(0).valobjId);
191
//				if (outputDiskBuffer.size() % 1000 == 0) {
192
//					for (String id : outputDiskBuffer) {
193
//						bw.write(id);
194
//						bw.newLine();
195
//					}
196
//					outputDiskBuffer.clear();
197
//				}
198
            }
199

    
200
            objsValidated++;
201

    
202
            if (objsValidated % jobStatusUpdateInterval == 0) {
203
                job.getParameters().put("recordsTested",
204
                        Integer.toString(objsValidated));
205
                blackboardHandler.getBlackboardHandler().ongoing(job);
206
            }
207
        } catch (Exception e) {
208
            logger.error("Error while proccessing record results");
209
            throw new ValidatorException("Error while proccessing record results",                    e);
210
        }
211
    }
212

    
213
    @Override
214
    public synchronized void finished(int jobId, Map<String, Object> jobContext) {
215
        try {
216

    
217
            logger.debug("Job " + jobId + " finished");
218

    
219
            internalJobsFinished++;
220
            if (internalJobsFinished == internalJobsSum) {
221

    
222
                logger.debug("internalJobsFinished == internalJobsSum: " + (internalJobsFinished == internalJobsSum));
223
                if (enableOutputToRS && eprSetOk) {
224
                    // send remaining xmls from buffers
225
//					if (outputRSBuffer.size() > 0) {
226
//						try {
227
//							resultSetService.populateRS(outputResultSetID,
228
//									outputRSBuffer);
229
//							outputRSBuffer.clear();
230
//						} catch (ResultSetServiceException e) {
231
//							logger.error("Error populating ResultSetService.",
232
//									e);
233
//						}
234
//					}
235
                    try {
236
                        queue.put("finished");
237

    
238
                    } catch (InterruptedException e) {
239
                        logger.error("Error finalizing queue", e);
240
                    }
241

    
242
                    if (queue.size() > 0) {
243
                        long time1 = Calendar.getInstance().getTimeInMillis();
244
                        activeThreads.getAndIncrement();
245
                        RSTask task = new RSTask(resultSetService, outputResultSetID, queue, activeThreads, allThreadsFinished);
246
                        rsExecutor.execute(task);
247
                        long time2 = Calendar.getInstance().getTimeInMillis();
248
                        logger.debug("Populating RS took " + ((time2 - time1))
249
                                + " milli seconds");
250
                    }
251

    
252
                    // finally, close rs.
253
                    logger.debug("active threads to finish: " + activeThreads.get());
254
                    logger.debug("trying to close result set");
255
                    while (activeThreads.get() > 0) {
256
                        logger.debug("waiting active threads to finish. Remaining: " + activeThreads.get());
257
                        synchronized (allThreadsFinished) {
258
                            allThreadsFinished.wait();
259
                        }
260
                        logger.debug("retrying to finish. Remaining: " + activeThreads.get());
261
                    }
262
                    logger.debug("closing result set");
263
                    resultSetService.closeRS(outputResultSetID);
264
                }
265
                if (enableOutputToDisk && diskSetOk) {
266
                    bw.write("FINISHED");
267
//					bw.newLine();
268
//					logger.debug("output disk buffer size: " + outputDiskBuffer.size());
269
//					if (outputDiskBuffer.size() > 0) {
270
//						for (String id : outputDiskBuffer) {
271
//							bw.write(id);
272
//							bw.newLine();
273
//						}
274
//						outputDiskBuffer.clear();
275
//					}
276
                    bw.close();
277
                }
278
                // job.getParameters().put("outputResultSetEpr",
279
                // EPRUtils.eprToXml(outputEpr));
280
                job.getParameters().put("jobId",
281
                        Integer.toString(validationJobId));
282

    
283
                job.getParameters().put("recordsTested", Integer.toString(objsValidated));
284
//				 job.getParameters().put(
285
//				 "recordsInResultSet",
286
//				 Integer.toString(resultSetService
287
//				 .getNumberOfElements(outputResultSetID)));
288
                // update job status
289
                blackboardHandler.getBlackboardHandler().done(job);
290

    
291
            } else {
292
                logger.debug("Waiting "
293
                        + (internalJobsSum - internalJobsFinished)
294
                        + " job(s) to finish");
295
            }
296
        } catch (Exception e) {
297
            logger.error("Error while finalizing successfull workflow job", e);
298
        }
299
    }
300

    
301
    @Override
302
    public synchronized void failed(int jobId, Map<String, Object> jobContext,
303
                                    Throwable t) {
304
        try {
305
            internalJobsFinished++;
306
            if (internalJobsFinished == internalJobsSum) {
307
                if (enableOutputToRS && eprSetOk) {
308
                    resultSetService.closeRS(outputResultSetID);
309
                }
310
                if (enableOutputToDisk && diskSetOk) {
311
                    outputDiskBuffer.clear();
312
                    bw.close();
313
                    File file = new File("/tmp/validator-wf/"
314
                            + validationJobId);
315
//					File file = new File("/tmp/validator-wf/"
316
//							+ job.getParameters().get("datasourceId"));
317
                    if (file.exists()) {
318
                        file.delete();
319
                    }
320
                }
321
                job.getParameters().put("jobId",
322
                        Integer.toString(validationJobId));
323

    
324
                // update job status
325
                blackboardHandler.getBlackboardHandler().failed(job, t);
326
            } else {
327
                logger.debug("Waiting "
328
                        + (internalJobsSum - internalJobsFinished)
329
                        + " job(s) to finish");
330
            }
331
        } catch (Exception e) {
332
            logger.error("Error while finalizing failed workflow job", e);
333
        }
334
    }
335

    
336
    private void sendToQueue(String xmlString) {
337
        logger.debug("received passed XMLresult");
338
        // add records
339
//		if (queue.size() == 100000) {
340
//			queue.clear();
341
//			resultSetService.
342
//		}
343
        try {
344
            queue.put(xmlString);
345
        } catch (InterruptedException e1) {
346
            logger.error("Error putting in queue.", e1);
347
        }
348
        if (queue.size() > 50) {
349
            long time1 = Calendar.getInstance().getTimeInMillis();
350
            RSTask task = new RSTask(resultSetService, outputResultSetID, queue, activeThreads, allThreadsFinished);
351
//			synchronized (counterLock) {
352
//				activeThreads ++;
353
//			}
354
            activeThreads.getAndIncrement();
355
            rsExecutor.execute(task);
356
            long time2 = Calendar.getInstance().getTimeInMillis();
357
            logger.debug("Populating RS took " + ((time2 - time1))
358
                    + " milli seconds");
359
        }
360
    }
361

    
362
    public RecordXMLBuilder getXmlBuilder() {
363
        return xmlBuilder;
364
    }
365

    
366
    public void setXmlBuilder(RecordXMLBuilder xmlBuilder) {
367
        this.xmlBuilder = xmlBuilder;
368
    }
369

    
370
    public BlackboardJob getJob() {
371
        return job;
372
    }
373

    
374
    public void setJob(BlackboardJob job) {
375
        this.job = job;
376
    }
377

    
378
    public ServiceLocator<ResultSetService> getResultSetServiceLocator() {
379
        return resultSetServiceLocator;
380
    }
381

    
382
    public void setResultSetServiceLocator(
383
            ServiceLocator<ResultSetService> resultSetServiceLocator) {
384
        this.resultSetServiceLocator = resultSetServiceLocator;
385
    }
386

    
387
    public BlackboardNotificationHandler<BlackboardServerHandler> getBlackboardHandler() {
388
        return blackboardHandler;
389
    }
390

    
391
    public void setBlackboardHandler(
392
            BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler) {
393
        this.blackboardHandler = blackboardHandler;
394
    }
395

    
396
    public TaskExecutor getRsExecutor() {
397
        return rsExecutor;
398
    }
399

    
400
    public void setRsExecutor(TaskExecutor rsExecutor) {
401
        this.rsExecutor = rsExecutor;
402
    }
403

    
404
    public int getInternalJobsSum() {
405
        return internalJobsSum;
406
    }
407

    
408
    public void setInternalJobsSum(int internalJobsSum) {
409
        this.internalJobsSum = internalJobsSum;
410
    }
411

    
412
    public int getValidationJobId() {
413
        return validationJobId;
414
    }
415

    
416
    public void setValidationJobId(int validationJobId) {
417
        this.validationJobId = validationJobId;
418
    }
419

    
420
    public int getJobStatusUpdateInterval() {
421
        return jobStatusUpdateInterval;
422
    }
423

    
424
    public void setJobStatusUpdateInterval(int jobStatusUpdateInterval) {
425
        this.jobStatusUpdateInterval = jobStatusUpdateInterval;
426
    }
427

    
428
    public boolean isEnableOutputToDisk() {
429
        return enableOutputToDisk;
430
    }
431

    
432
    public void setEnableOutputToDisk(boolean enableOutputToDisk) {
433
        this.enableOutputToDisk = enableOutputToDisk;
434
    }
435

    
436
    public boolean isEnableOutputToRS() {
437
        return enableOutputToRS;
438
    }
439

    
440
    public void setEnableOutputToRS(boolean enableOutputToRS) {
441
        this.enableOutputToRS = enableOutputToRS;
442
    }
443

    
444
}
(3-3/8)