Project

General

Profile

1
package eu.dnetlib.msro.workflows.resultset;
2

    
3
import java.util.List;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import com.google.common.collect.Lists;
9
import com.googlecode.sarasvati.GraphProcess;
10

    
11
import eu.dnetlib.enabling.resultset.ResultSet;
12
import eu.dnetlib.enabling.resultset.ResultSetAware;
13
import eu.dnetlib.enabling.resultset.ResultSetListener;
14
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
15
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
16

    
17
public class ProcessCountingResultSetListener implements ResultSetListener, ResultSetAware {
18

    
19
	private int count;
20

    
21
	private GraphProcess process;
22
	private ResultSetService inputService;
23
	private String inputRsId;
24
	private ResultSet outputResulset;
25
	private int inputSize;
26
	private boolean inaccurate = false;
27

    
28
	private final static String RS_CLOSED = "closed";
29
	/**
30
	 * Logger.
31
	 */
32
	private static final Log log = LogFactory.getLog(ProcessCountingResultSetListener.class);
33

    
34
	public ProcessCountingResultSetListener(final GraphProcess process, final ResultSetService inputService, final String inputRsId) throws ResultSetException {
35
		super();
36
		this.process = process;
37
		this.inputService = inputService;
38
		this.inputRsId = inputRsId;
39
		this.inputSize = inputService.getNumberOfElements(inputRsId);
40
		this.count = 0;
41
	}
42

    
43
	@Override
44
	public List<String> getResult(final int from, final int to) {
45
		if (process.isCanceled()) {
46
			this.outputResulset.close();
47
			return Lists.newArrayList();
48
		}
49
		try {
50
			this.count = to;
51
			if (count >= this.inputSize) {
52
				closeRSIfNeeded();
53
			}
54
			return inputService.getResult(inputRsId, from, to, "WAITING");
55
		} catch (ResultSetException e) {
56
			log.error("Error fetching records from resultset: " + inputRsId);
57
			throw new RuntimeException(e);
58
		}
59
	}
60

    
61
	@Override
62
	public int getSize() {
63
		if (process.isCanceled()) {
64
			this.outputResulset.close();
65
			return count;
66
		}
67
		try {
68
			int size = inputService.getNumberOfElements(inputRsId);
69
			if (this.inputSize != size) {
70
				this.inputSize = size;
71
				this.inaccurate = true;
72
			}
73
			this.inputSize = size;
74
			if (count >= this.inputSize) {
75
				closeRSIfNeeded();
76
				this.inaccurate = false;
77
			}
78
			return this.inputSize;
79
		} catch (ResultSetException e) {
80
			log.error("Error fetching records from resultset: " + inputRsId);
81
			throw new RuntimeException(e);
82
		}
83

    
84
	}
85

    
86
	private void closeRSIfNeeded() throws ResultSetException {
87
		if (inputService.getProperty(inputRsId, "classSimpleName").equalsIgnoreCase("TransientPushResultSetImpl")) {
88
			log.info("Not closing the counting resultset unless the underlying push result is already closed ");
89
			if (inputService.getRSStatus(inputRsId).equals(RS_CLOSED)) {
90
				outputResulset.close();
91
			}
92
		} else {
93
			outputResulset.close();
94
		}
95
	}
96

    
97
	@Override
98
	public void setResultSet(final ResultSet outputResulset) {
99
		this.outputResulset = outputResulset;
100
	}
101

    
102
	public int getCount() {
103
		return count;
104
	}
105

    
106
	public void setCount(final int count) {
107
		this.count = count;
108
	}
109

    
110
	public boolean isInaccurate() {
111
		return inaccurate;
112
	}
113

    
114
}
(2-2/2)