Project

General

Profile

« Previous | Next » 

Revision 53688

added configurable producer timeout

View differences:

modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/schemaorg/SchemaOrgPlugin.java
57 57
	private KaggleRepositoryIterable.Options compileKaggleRepositoryOptions(InterfaceDescriptor interfaceDescriptor) throws MalformedURLException {
58 58
		KaggleRepositoryIterable.Options kaggleRepositoryOptions = new KaggleRepositoryIterable.Options();
59 59
		kaggleRepositoryOptions.setQueueSize(Utils.getAsInt(interfaceDescriptor.getParams(), "httpapi-kaggle_queueSize", 100));
60
		kaggleRepositoryOptions.setPutTimeout(Utils.getAsLong(interfaceDescriptor.getParams(), "httpapi-kaggle_producerBlockPollingTimeout", 20));
61
		kaggleRepositoryOptions.setPutTimeoutUnit(Utils.getAsEnum(interfaceDescriptor.getParams(), "httpapi-kaggle_producerBlockPollingTimeoutUnit", TimeUnit.MINUTES, TimeUnit.class));
60 62
		kaggleRepositoryOptions.setCharset(Utils.getAsCharset(interfaceDescriptor.getParams(), "httpapi-kaggle_APICharset", StandardCharsets.UTF_8));
61 63
		kaggleRepositoryOptions.setQueryUrl(Utils.getAsString(interfaceDescriptor.getParams(), "httpapi-kaggle_queryUrl", null));
62 64
		kaggleRepositoryOptions.setQueryPagePlaceholder(Utils.getAsString(interfaceDescriptor.getParams(), "httpapi-kaggle_queryPagePlaceholder", "{PAGE}"));
......
96 98
	private SitemapIndexRepositoryIterable.Options compileSitemapIndexRepositoryOptions(InterfaceDescriptor interfaceDescriptor) throws MalformedURLException {
97 99
		SitemapIndexRepositoryIterable.Options sitemapIndexRepositoryIterableOptions = new SitemapIndexRepositoryIterable.Options();
98 100
		sitemapIndexRepositoryIterableOptions.setQueueSize(Utils.getAsInt(interfaceDescriptor.getParams(), "sitemap_queueSize", 100));
101
		sitemapIndexRepositoryIterableOptions.setPutTimeout(Utils.getAsLong(interfaceDescriptor.getParams(), "sitemap_producerBlockPollingTimeout", 20));
102
		sitemapIndexRepositoryIterableOptions.setPutTimeoutUnit(Utils.getAsEnum(interfaceDescriptor.getParams(), "sitemap_producerBlockPollingTimeoutUnit", TimeUnit.MINUTES, TimeUnit.class));
99 103
		sitemapIndexRepositoryIterableOptions.setRepositoryQueueIteratorOptions(this.compileRepositoryQueueOptions(interfaceDescriptor));
100 104
		sitemapIndexRepositoryIterableOptions.setSitemapFileIteratorOptions(this.compileSitemapFileOptions(interfaceDescriptor));
101 105
		sitemapIndexRepositoryIterableOptions.setSitemapIndexIteratorOptions(this.compileSitemapIndexOptions(interfaceDescriptor));
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/schemaorg/httpapi/kaggle/KaggleRepositoryIterable.java
12 12
import java.net.URL;
13 13
import java.nio.charset.Charset;
14 14
import java.util.Iterator;
15
import java.util.NoSuchElementException;
15 16
import java.util.concurrent.ArrayBlockingQueue;
16 17
import java.util.concurrent.ExecutorService;
17 18
import java.util.concurrent.Executors;
19
import java.util.concurrent.TimeUnit;
18 20

  
19 21
public class KaggleRepositoryIterable implements HttpApiRepositoryIterable {
20 22
	private static final Log log = LogFactory.getLog(KaggleRepositoryIterable.class);
......
27 29
		private String responsePropertyDatasetList;
28 30
		private String responsePropertyDatasetUrl;
29 31
		private String responseBaseDatasetUrl;
32
		private long putTimeout;
33
		private TimeUnit putTimeoutUnit;
30 34

  
31 35
		private RepositoryQueueIterator.Options repositoryQueueIteratorOptions;
32 36

  
33 37
		private int queueSize;
34 38

  
39
		public long getPutTimeout() {
40
			return putTimeout;
41
		}
42

  
43
		public void setPutTimeout(long putTimeout) {
44
			this.putTimeout = putTimeout;
45
		}
46

  
47
		public TimeUnit getPutTimeoutUnit() {
48
			return putTimeoutUnit;
49
		}
50

  
51
		public void setPutTimeoutUnit(TimeUnit putTimeoutUnit) {
52
			this.putTimeoutUnit = putTimeoutUnit;
53
		}
54

  
35 55
		public int getQueueSize() {
36 56
			return queueSize;
37 57
		}
......
161 181

  
162 182
						log.debug("adding endpoint in queue");
163 183
						log.debug("queue size: " + queue.size());
164
						queue.put(endpoint);
184

  
185
						try {
186
							queue.offer(endpoint, options.getPutTimeout(), options.getPutTimeoutUnit());
187
						} catch (InterruptedException ex) {
188
							log.warn(String.format("could not put elements from queue for more than %s %s. breaking", options.getPutTimeout(), options.getPutTimeoutUnit()));
189
							break;
190
						}
165 191
						log.debug("endpoint added in queue");
166 192
						log.debug("queue size: " + queue.size());
167 193
					}
......
172 198
				log.error("problem execution harvesting", ex);
173 199
			} finally {
174 200
				try {
175
					queue.put(RepositoryIterable.TerminationHint);
201
					queue.offer(RepositoryIterable.TerminationHint, options.getPutTimeout(), options.getPutTimeoutUnit());
176 202
				} catch (Exception ex) {
177 203
					log.fatal("could not add termination hint. the process will not terminate gracefully", ex);
178 204
				}
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/schemaorg/sitemapindex/SitemapIndexRepositoryIterable.java
10 10
import java.util.concurrent.ArrayBlockingQueue;
11 11
import java.util.concurrent.ExecutorService;
12 12
import java.util.concurrent.Executors;
13
import java.util.concurrent.TimeUnit;
13 14

  
14 15
public class SitemapIndexRepositoryIterable implements RepositoryIterable {
15 16
	private static final Log log = LogFactory.getLog(SitemapIndexRepositoryIterable.class);
......
18 19
		private SitemapIndexIterator.Options sitemapIndexIteratorOptions;
19 20
		private SitemapFileIterator.Options sitemapFileIteratorOptions;
20 21
		private RepositoryQueueIterator.Options repositoryQueueIteratorOptions;
22
		private long putTimeout;
23
		private TimeUnit putTimeoutUnit;
21 24

  
22 25
		private int queueSize;
23 26

  
27
		public long getPutTimeout() {
28
			return putTimeout;
29
		}
30

  
31
		public void setPutTimeout(long putTimeout) {
32
			this.putTimeout = putTimeout;
33
		}
34

  
35
		public TimeUnit getPutTimeoutUnit() {
36
			return putTimeoutUnit;
37
		}
38

  
39
		public void setPutTimeoutUnit(TimeUnit putTimeoutUnit) {
40
			this.putTimeoutUnit = putTimeoutUnit;
41
		}
42

  
24 43
		public int getQueueSize() {
25 44
			return queueSize;
26 45
		}
......
103 122

  
104 123
						log.debug("adding endpoint in queue");
105 124
						log.debug("queue size: " + queue.size());
106
						queue.put(endpoint);
125
						try {
126
							queue.offer(endpoint, options.getPutTimeout(), options.getPutTimeoutUnit());
127
						} catch (InterruptedException ex) {
128
							log.warn(String.format("could not put elements from queue for more than %s %s. breaking", options.getPutTimeout(), options.getPutTimeoutUnit()));
129
							break;
130
						}
107 131
						log.debug("endpoint added in queue");
108 132
						log.debug("queue size: " + queue.size());
109 133
					}
......
113 137
			}
114 138
			finally {
115 139
				try {
116
					queue.put(RepositoryIterable.TerminationHint);
140
					queue.offer(RepositoryIterable.TerminationHint, options.getPutTimeout(), options.getPutTimeoutUnit());
117 141
				} catch (Exception ex) {
118 142
					log.fatal("could not add termination hint. the process will not terminate gracefully", ex);
119 143
				}

Also available in: Unified diff