Revision 53688
Added by Giorgos Papanikos about 6 years ago
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/schemaorg/SchemaOrgPlugin.java | ||
---|---|---|
57 | 57 |
private KaggleRepositoryIterable.Options compileKaggleRepositoryOptions(InterfaceDescriptor interfaceDescriptor) throws MalformedURLException { |
58 | 58 |
KaggleRepositoryIterable.Options kaggleRepositoryOptions = new KaggleRepositoryIterable.Options(); |
59 | 59 |
kaggleRepositoryOptions.setQueueSize(Utils.getAsInt(interfaceDescriptor.getParams(), "httpapi-kaggle_queueSize", 100)); |
60 |
kaggleRepositoryOptions.setPutTimeout(Utils.getAsLong(interfaceDescriptor.getParams(), "httpapi-kaggle_producerBlockPollingTimeout", 20)); |
|
61 |
kaggleRepositoryOptions.setPutTimeoutUnit(Utils.getAsEnum(interfaceDescriptor.getParams(), "httpapi-kaggle_producerBlockPollingTimeoutUnit", TimeUnit.MINUTES, TimeUnit.class)); |
|
60 | 62 |
kaggleRepositoryOptions.setCharset(Utils.getAsCharset(interfaceDescriptor.getParams(), "httpapi-kaggle_APICharset", StandardCharsets.UTF_8)); |
61 | 63 |
kaggleRepositoryOptions.setQueryUrl(Utils.getAsString(interfaceDescriptor.getParams(), "httpapi-kaggle_queryUrl", null)); |
62 | 64 |
kaggleRepositoryOptions.setQueryPagePlaceholder(Utils.getAsString(interfaceDescriptor.getParams(), "httpapi-kaggle_queryPagePlaceholder", "{PAGE}")); |
... | ... | |
96 | 98 |
private SitemapIndexRepositoryIterable.Options compileSitemapIndexRepositoryOptions(InterfaceDescriptor interfaceDescriptor) throws MalformedURLException { |
97 | 99 |
SitemapIndexRepositoryIterable.Options sitemapIndexRepositoryIterableOptions = new SitemapIndexRepositoryIterable.Options(); |
98 | 100 |
sitemapIndexRepositoryIterableOptions.setQueueSize(Utils.getAsInt(interfaceDescriptor.getParams(), "sitemap_queueSize", 100)); |
101 |
sitemapIndexRepositoryIterableOptions.setPutTimeout(Utils.getAsLong(interfaceDescriptor.getParams(), "sitemap_producerBlockPollingTimeout", 20)); |
|
102 |
sitemapIndexRepositoryIterableOptions.setPutTimeoutUnit(Utils.getAsEnum(interfaceDescriptor.getParams(), "sitemap_producerBlockPollingTimeoutUnit", TimeUnit.MINUTES, TimeUnit.class)); |
|
99 | 103 |
sitemapIndexRepositoryIterableOptions.setRepositoryQueueIteratorOptions(this.compileRepositoryQueueOptions(interfaceDescriptor)); |
100 | 104 |
sitemapIndexRepositoryIterableOptions.setSitemapFileIteratorOptions(this.compileSitemapFileOptions(interfaceDescriptor)); |
101 | 105 |
sitemapIndexRepositoryIterableOptions.setSitemapIndexIteratorOptions(this.compileSitemapIndexOptions(interfaceDescriptor)); |
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/schemaorg/httpapi/kaggle/KaggleRepositoryIterable.java | ||
---|---|---|
12 | 12 |
import java.net.URL; |
13 | 13 |
import java.nio.charset.Charset; |
14 | 14 |
import java.util.Iterator; |
15 |
import java.util.NoSuchElementException; |
|
15 | 16 |
import java.util.concurrent.ArrayBlockingQueue; |
16 | 17 |
import java.util.concurrent.ExecutorService; |
17 | 18 |
import java.util.concurrent.Executors; |
19 |
import java.util.concurrent.TimeUnit; |
|
18 | 20 |
|
19 | 21 |
public class KaggleRepositoryIterable implements HttpApiRepositoryIterable { |
20 | 22 |
private static final Log log = LogFactory.getLog(KaggleRepositoryIterable.class); |
... | ... | |
27 | 29 |
private String responsePropertyDatasetList; |
28 | 30 |
private String responsePropertyDatasetUrl; |
29 | 31 |
private String responseBaseDatasetUrl; |
32 |
private long putTimeout; |
|
33 |
private TimeUnit putTimeoutUnit; |
|
30 | 34 |
|
31 | 35 |
private RepositoryQueueIterator.Options repositoryQueueIteratorOptions; |
32 | 36 |
|
33 | 37 |
private int queueSize; |
34 | 38 |
|
39 |
public long getPutTimeout() { |
|
40 |
return putTimeout; |
|
41 |
} |
|
42 |
|
|
43 |
public void setPutTimeout(long putTimeout) { |
|
44 |
this.putTimeout = putTimeout; |
|
45 |
} |
|
46 |
|
|
47 |
public TimeUnit getPutTimeoutUnit() { |
|
48 |
return putTimeoutUnit; |
|
49 |
} |
|
50 |
|
|
51 |
public void setPutTimeoutUnit(TimeUnit putTimeoutUnit) { |
|
52 |
this.putTimeoutUnit = putTimeoutUnit; |
|
53 |
} |
|
54 |
|
|
35 | 55 |
public int getQueueSize() { |
36 | 56 |
return queueSize; |
37 | 57 |
} |
... | ... | |
161 | 181 |
|
162 | 182 |
log.debug("adding endpoint in queue"); |
163 | 183 |
log.debug("queue size: " + queue.size()); |
164 |
queue.put(endpoint); |
|
184 |
|
|
185 |
try { |
|
186 |
queue.offer(endpoint, options.getPutTimeout(), options.getPutTimeoutUnit()); |
|
187 |
} catch (InterruptedException ex) { |
|
188 |
log.warn(String.format("could not put elements from queue for more than %s %s. breaking", options.getPutTimeout(), options.getPutTimeoutUnit())); |
|
189 |
break; |
|
190 |
} |
|
165 | 191 |
log.debug("endpoint added in queue"); |
166 | 192 |
log.debug("queue size: " + queue.size()); |
167 | 193 |
} |
... | ... | |
172 | 198 |
log.error("problem execution harvesting", ex); |
173 | 199 |
} finally { |
174 | 200 |
try { |
175 |
queue.put(RepositoryIterable.TerminationHint);
|
|
201 |
queue.offer(RepositoryIterable.TerminationHint, options.getPutTimeout(), options.getPutTimeoutUnit());
|
|
176 | 202 |
} catch (Exception ex) { |
177 | 203 |
log.fatal("could not add termination hint. the process will not terminate gracefully", ex); |
178 | 204 |
} |
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/schemaorg/sitemapindex/SitemapIndexRepositoryIterable.java | ||
---|---|---|
10 | 10 |
import java.util.concurrent.ArrayBlockingQueue; |
11 | 11 |
import java.util.concurrent.ExecutorService; |
12 | 12 |
import java.util.concurrent.Executors; |
13 |
import java.util.concurrent.TimeUnit; |
|
13 | 14 |
|
14 | 15 |
public class SitemapIndexRepositoryIterable implements RepositoryIterable { |
15 | 16 |
private static final Log log = LogFactory.getLog(SitemapIndexRepositoryIterable.class); |
... | ... | |
18 | 19 |
private SitemapIndexIterator.Options sitemapIndexIteratorOptions; |
19 | 20 |
private SitemapFileIterator.Options sitemapFileIteratorOptions; |
20 | 21 |
private RepositoryQueueIterator.Options repositoryQueueIteratorOptions; |
22 |
private long putTimeout; |
|
23 |
private TimeUnit putTimeoutUnit; |
|
21 | 24 |
|
22 | 25 |
private int queueSize; |
23 | 26 |
|
27 |
public long getPutTimeout() { |
|
28 |
return putTimeout; |
|
29 |
} |
|
30 |
|
|
31 |
public void setPutTimeout(long putTimeout) { |
|
32 |
this.putTimeout = putTimeout; |
|
33 |
} |
|
34 |
|
|
35 |
public TimeUnit getPutTimeoutUnit() { |
|
36 |
return putTimeoutUnit; |
|
37 |
} |
|
38 |
|
|
39 |
public void setPutTimeoutUnit(TimeUnit putTimeoutUnit) { |
|
40 |
this.putTimeoutUnit = putTimeoutUnit; |
|
41 |
} |
|
42 |
|
|
24 | 43 |
public int getQueueSize() { |
25 | 44 |
return queueSize; |
26 | 45 |
} |
... | ... | |
103 | 122 |
|
104 | 123 |
log.debug("adding endpoint in queue"); |
105 | 124 |
log.debug("queue size: " + queue.size()); |
106 |
queue.put(endpoint); |
|
125 |
try { |
|
126 |
queue.offer(endpoint, options.getPutTimeout(), options.getPutTimeoutUnit()); |
|
127 |
} catch (InterruptedException ex) { |
|
128 |
log.warn(String.format("could not put elements from queue for more than %s %s. breaking", options.getPutTimeout(), options.getPutTimeoutUnit())); |
|
129 |
break; |
|
130 |
} |
|
107 | 131 |
log.debug("endpoint added in queue"); |
108 | 132 |
log.debug("queue size: " + queue.size()); |
109 | 133 |
} |
... | ... | |
113 | 137 |
} |
114 | 138 |
finally { |
115 | 139 |
try { |
116 |
queue.put(RepositoryIterable.TerminationHint);
|
|
140 |
queue.offer(RepositoryIterable.TerminationHint, options.getPutTimeout(), options.getPutTimeoutUnit());
|
|
117 | 141 |
} catch (Exception ex) { |
118 | 142 |
log.fatal("could not add termination hint. the process will not terminate gracefully", ex); |
119 | 143 |
} |
Also available in: Unified diff
added configurable producer timeout