Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.util.ArrayList;
4
import java.util.Calendar;
5
import java.util.List;
6
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.atomic.AtomicInteger;
8

    
9
import org.apache.log4j.Logger;
10

    
11
import eu.dnetlib.api.enabling.ResultSetService;
12

    
13
/**
14
 * A Rule that is applied on a Validation Object.
15
 * 
16
 * @author Nikon Gasparis
17
 *
18
 */
19
public class RSTask implements Runnable {
20

    
21
	private final ResultSetService resultSetService;
22
	private final String outputResultSetID;
23
	private final BlockingQueue<String> queue;
24
	
25
	private final AtomicInteger activeThreads;
26
	private final Object allThreadsFinished;
27
	
28
	private boolean success;
29
	private List<String> errors;
30
	private Exception exception = null;;
31

    
32
	
33
public RSTask(ResultSetService resultSetService, String outputResultSetID,
34
		BlockingQueue<String> queue, AtomicInteger activeThreads, Object allThreadsFinished) {
35
		super();
36
		this.resultSetService = resultSetService;
37
		this.outputResultSetID = outputResultSetID;
38
		this.queue = queue;
39
		this.activeThreads = activeThreads;
40
		this.allThreadsFinished = allThreadsFinished;
41
	}
42

    
43
	@Override
44
	public void run() {
45
		Logger log = Logger.getLogger(RSTask.class);
46
		long time1 = Calendar.getInstance().getTimeInMillis();
47
		try {
48
			List <String> outputRSBuffer = new ArrayList<String>();
49
			for (int i=0 ; i < 50 ; i++) {
50
				String record = queue.take();
51
				if (record.equalsIgnoreCase("finished")) {
52
//					log.debug("closing result set");
53
//					resultSetService.closeRS(outputResultSetID);
54
					break;
55
				}
56
				else 
57
					outputRSBuffer.add(record);
58
			}
59
			resultSetService.populateRS(outputResultSetID, outputRSBuffer);
60
			if (activeThreads.decrementAndGet() == 0) {
61
				synchronized (allThreadsFinished) {
62
					allThreadsFinished.notify();
63
				}
64
			}
65
		} catch (Exception e) {
66
			log.error("Error populating ResultSetService.", e);
67
		}
68
		long time2 = Calendar.getInstance().getTimeInMillis();
69
		log.debug("Populating RS took " + ((time2 - time1))
70
				+ " milli seconds");
71
	}
72

    
73
	public boolean isSuccess() {
74
		return success;
75
	}
76

    
77
	public Exception getException() {
78
		return exception;
79
	}
80

    
81
	public List<String> getErrors() {
82
		return errors;
83
	}
84

    
85
	public void setErrors(List<String> errors) {
86
		this.errors = errors;
87
	}
88

    
89
}
(5-5/8)