1
|
package eu.dnetlib.validator.service.impls.listeners;
|
2
|
|
3
|
import java.util.ArrayList;
|
4
|
import java.util.Calendar;
|
5
|
import java.util.List;
|
6
|
import java.util.concurrent.BlockingQueue;
|
7
|
import java.util.concurrent.atomic.AtomicInteger;
|
8
|
|
9
|
import org.apache.log4j.Logger;
|
10
|
|
11
|
import eu.dnetlib.api.enabling.ResultSetService;
|
12
|
|
13
|
/**
|
14
|
* A Rule that is applied on a Validation Object.
|
15
|
*
|
16
|
* @author Nikon Gasparis
|
17
|
*
|
18
|
*/
|
19
|
public class RSTask implements Runnable {
|
20
|
|
21
|
private final ResultSetService resultSetService;
|
22
|
private final String outputResultSetID;
|
23
|
private final BlockingQueue<String> queue;
|
24
|
|
25
|
private final AtomicInteger activeThreads;
|
26
|
private final Object allThreadsFinished;
|
27
|
|
28
|
private boolean success;
|
29
|
private List<String> errors;
|
30
|
private Exception exception = null;;
|
31
|
|
32
|
|
33
|
public RSTask(ResultSetService resultSetService, String outputResultSetID,
|
34
|
BlockingQueue<String> queue, AtomicInteger activeThreads, Object allThreadsFinished) {
|
35
|
super();
|
36
|
this.resultSetService = resultSetService;
|
37
|
this.outputResultSetID = outputResultSetID;
|
38
|
this.queue = queue;
|
39
|
this.activeThreads = activeThreads;
|
40
|
this.allThreadsFinished = allThreadsFinished;
|
41
|
}
|
42
|
|
43
|
@Override
|
44
|
public void run() {
|
45
|
Logger log = Logger.getLogger(RSTask.class);
|
46
|
long time1 = Calendar.getInstance().getTimeInMillis();
|
47
|
try {
|
48
|
List <String> outputRSBuffer = new ArrayList<String>();
|
49
|
for (int i=0 ; i < 50 ; i++) {
|
50
|
String record = queue.take();
|
51
|
if (record.equalsIgnoreCase("finished")) {
|
52
|
// log.debug("closing result set");
|
53
|
// resultSetService.closeRS(outputResultSetID);
|
54
|
break;
|
55
|
}
|
56
|
else
|
57
|
outputRSBuffer.add(record);
|
58
|
}
|
59
|
resultSetService.populateRS(outputResultSetID, outputRSBuffer);
|
60
|
if (activeThreads.decrementAndGet() == 0) {
|
61
|
synchronized (allThreadsFinished) {
|
62
|
allThreadsFinished.notify();
|
63
|
}
|
64
|
}
|
65
|
} catch (Exception e) {
|
66
|
log.error("Error populating ResultSetService.", e);
|
67
|
}
|
68
|
long time2 = Calendar.getInstance().getTimeInMillis();
|
69
|
log.debug("Populating RS took " + ((time2 - time1))
|
70
|
+ " milli seconds");
|
71
|
}
|
72
|
|
73
|
public boolean isSuccess() {
|
74
|
return success;
|
75
|
}
|
76
|
|
77
|
public Exception getException() {
|
78
|
return exception;
|
79
|
}
|
80
|
|
81
|
public List<String> getErrors() {
|
82
|
return errors;
|
83
|
}
|
84
|
|
85
|
public void setErrors(List<String> errors) {
|
86
|
this.errors = errors;
|
87
|
}
|
88
|
|
89
|
}
|