Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.schemaorg;
2

    
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

    
6
import java.util.Iterator;
7
import java.util.NoSuchElementException;
8
import java.util.concurrent.ArrayBlockingQueue;
9
import java.util.concurrent.TimeUnit;
10

    
11
public class RepositoryQueueIterator implements Iterator<String> {
12
	private static final Log log = LogFactory.getLog(RepositoryQueueIterator.class);
13

    
14
	public static class Options {
15
		private Boolean blockPolling;
16
		private long pollTimeout;
17
		private TimeUnit pollTimeoutUnit;
18

    
19
		public Boolean getBlockPolling() {
20
			return blockPolling;
21
		}
22

    
23
		public void setBlockPolling(Boolean blockPolling) {
24
			this.blockPolling = blockPolling;
25
		}
26

    
27
		public long getPollTimeout() {
28
			return pollTimeout;
29
		}
30

    
31
		public void setPollTimeout(long pollTimeout) {
32
			this.pollTimeout = pollTimeout;
33
		}
34

    
35
		public TimeUnit getPollTimeoutUnit() {
36
			return pollTimeoutUnit;
37
		}
38

    
39
		public void setPollTimeoutUnit(TimeUnit pollTimeoutUnit) {
40
			this.pollTimeoutUnit = pollTimeoutUnit;
41
		}
42
	}
43

    
44
	private ArrayBlockingQueue<String> queue;
45
	private Options options;
46
	private boolean hasTerminated;
47

    
48
	public RepositoryQueueIterator(Options options, ArrayBlockingQueue<String> queue) {
49
		this.options = options;
50
		this.queue = queue;
51
		this.hasTerminated = false;
52
	}
53

    
54
	@Override
55
	public boolean hasNext() {
56
		if(this.hasTerminated) return false;
57
		return true;
58
	}
59

    
60
	@Override
61
	public String next() {
62
		String next = this.poll();
63
		if (next != null && next.equalsIgnoreCase(RepositoryIterable.TerminationHint)) {
64
			this.hasTerminated = true;
65
			next = null;
66
		}
67

    
68
		return next;
69
	}
70

    
71
	private String poll(){
72
		if(this.options.getBlockPolling()) {
73
			try {
74
				return this.queue.poll(this.options.getPollTimeout(), this.options.getPollTimeoutUnit());
75
			} catch (InterruptedException ex) {
76
				log.warn(String.format("could not poll elements from queue for more than %s %s. throwing", this.options.getPollTimeout(), this.options.getPollTimeoutUnit()));
77
				throw new NoSuchElementException(ex.getMessage());
78
			}
79
		}
80
		else {
81
			return this.queue.poll();
82
		}
83
	}
84
}
(6-6/11)