Project

General

Profile

1
package eu.dnetlib.msro.workflows.resultset;
2

    
3
import java.util.List;
4

    
5
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
6
import org.apache.commons.lang.StringUtils;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

    
10
import com.google.common.collect.Lists;
11
import com.googlecode.sarasvati.GraphProcess;
12

    
13
import eu.dnetlib.enabling.resultset.ResultSet;
14
import eu.dnetlib.enabling.resultset.ResultSetAware;
15
import eu.dnetlib.enabling.resultset.ResultSetListener;
16
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
17
import eu.dnetlib.enabling.resultset.rmi.ResultSetService;
18

    
19
public class ProcessCountingResultSetListener implements ResultSetListener, ResultSetAware {
20

    
21
	private int count;
22

    
23
	private GraphProcess process;
24
	private ResultSetService inputService;
25
	private String inputRsId;
26
	private ResultSet outputResulset;
27
	private int inputSize;
28
	private boolean inaccurate = false;
29

    
30
	private final static String RS_CLOSED = "closed";
31
	/**
32
	 * Logger.
33
	 */
34
	private static final Log log = LogFactory.getLog(ProcessCountingResultSetListener.class);
35

    
36
	public ProcessCountingResultSetListener(final GraphProcess process, final ResultSetService inputService, final String inputRsId) throws ResultSetException {
37
		super();
38
		this.process = process;
39
		this.inputService = inputService;
40
		this.inputRsId = inputRsId;
41
		this.inputSize = inputService.getNumberOfElements(inputRsId);
42
		this.count = 0;
43
	}
44

    
45
	@Override
46
	public List<String> getResult(final int from, final int to) {
47
		if (process.isCanceled()) {
48
			log.warn("Process canceled, closing result set");
49
			this.outputResulset.close();
50
			return Lists.newArrayList();
51
		}
52
		try {
53
			this.count = to;
54
			if (count >= this.inputSize) {
55
				closeRSIfNeeded();
56
			}
57
			return inputService.getResult(inputRsId, from, to, "WAITING");
58
		} catch (ResultSetException e) {
59
			log.error("Error fetching records from resultset: " + inputRsId);
60
			throw new RuntimeException(e);
61
		}
62
	}
63

    
64
	@Override
65
	public int getSize() {
66
		if (process.isCanceled()) {
67
			this.outputResulset.close();
68
			return count;
69
		}
70
		try {
71
			int size = inputService.getNumberOfElements(inputRsId);
72
			if (this.inputSize != size) {
73
				this.inputSize = size;
74
				this.inaccurate = true;
75
			}
76
			this.inputSize = size;
77
			if (count >= this.inputSize) {
78
				this.inaccurate = false;
79
				closeRSIfNeeded();
80

    
81
			}
82
			return this.inputSize;
83
		} catch (ResultSetException e) {
84
			log.error("Error fetching records from resultset: " + inputRsId);
85
			throw new RuntimeException(e);
86
		}
87

    
88
	}
89

    
90
	private void closeRSIfNeeded() throws ResultSetException {
91
		String inputClassName = inputService.getProperty(inputRsId, "classSimpleName");
92
		if (StringUtils.isNotBlank(inputClassName) && inputClassName.equalsIgnoreCase("TransientPushResultSetImpl")) {
93
			// Push result set has inaccurate progress
94
			this.inaccurate = true;
95
			log.info("Not closing the counting resultset unless the underlying push result is already closed ");
96
			if (inputService.getRSStatus(inputRsId).equals(RS_CLOSED)) {
97
				outputResulset.close();
98
			}
99
		} else {
100
			outputResulset.close();
101
		}
102
	}
103

    
104
	@Override
105
	public void setResultSet(final ResultSet outputResulset) {
106
		this.outputResulset = outputResulset;
107
	}
108

    
109
	public int getCount() {
110
		return count;
111
	}
112

    
113
	public void setCount(final int count) {
114
		this.count = count;
115
	}
116

    
117
	public boolean isInaccurate() {
118
		return inaccurate;
119
	}
120

    
121
}
(2-2/2)