83 |
83 |
@Autowired
|
84 |
84 |
private DatasourceApiRepository datasourceApiRepository;
|
85 |
85 |
|
86 |
|
private ListeningExecutorService service;
|
|
86 |
private ListeningExecutorService executor;
|
87 |
87 |
|
88 |
88 |
@PostConstruct
|
89 |
89 |
public void init() {
|
90 |
|
service = MoreExecutors.listeningDecorator(
|
|
90 |
executor = MoreExecutors.listeningDecorator(
|
91 |
91 |
new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
|
92 |
92 |
new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
|
93 |
93 |
}
|
... | ... | |
356 |
356 |
final Queue<Throwable> errors,
|
357 |
357 |
final DatasourceResponse datasourceResponse) {
|
358 |
358 |
Futures.addCallback(
|
359 |
|
service.submit(() -> isLookupClient.calculateCurrentIndexDsInfo(errors)),
|
|
359 |
executor.submit(() -> isLookupClient.calculateCurrentIndexDsInfo(errors)),
|
360 |
360 |
new FutureCallback<IndexDsInfo>() {
|
361 |
361 |
|
362 |
362 |
public void onSuccess(final IndexDsInfo info) {
|
... | ... | |
364 |
364 |
final CountDownLatch innerLatch = new CountDownLatch(2);
|
365 |
365 |
|
366 |
366 |
Futures.addCallback(
|
367 |
|
service.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
|
|
367 |
executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
|
368 |
368 |
new FutureCallback<IndexRecordsInfo>() {
|
369 |
369 |
public void onSuccess(IndexRecordsInfo info) {
|
370 |
370 |
datasourceResponse.setIndexRecords(info.getTotal()).setFundedContent(info.getFunded()).setLastIndexingDate(info.getDate());
|
... | ... | |
374 |
374 |
errors.offer(e);
|
375 |
375 |
innerLatch.countDown();
|
376 |
376 |
}
|
377 |
|
});
|
|
377 |
}, executor);
|
378 |
378 |
|
379 |
379 |
Futures.addCallback(
|
380 |
|
service.submit(() ->
|
|
380 |
executor.submit(() ->
|
381 |
381 |
objectStoreClient.getObjectStoreSize(
|
382 |
382 |
isLookupClient.getObjectStoreId(dsId, errors),
|
383 |
383 |
errors)),
|
... | ... | |
393 |
393 |
errors.offer(e);
|
394 |
394 |
innerLatch.countDown();
|
395 |
395 |
}
|
396 |
|
}
|
397 |
|
);
|
|
396 |
}, executor);
|
398 |
397 |
waitLatch(innerLatch, errors, config.getRequestTimeout());
|
399 |
398 |
|
400 |
399 |
outerLatch.countDown();
|
... | ... | |
405 |
404 |
errors.offer(e);
|
406 |
405 |
outerLatch.countDown();
|
407 |
406 |
}
|
408 |
|
});
|
|
407 |
}, executor);
|
409 |
408 |
}
|
410 |
409 |
|
411 |
410 |
private void getAggregationHistory(final String dsId,
|
... | ... | |
413 |
412 |
final Queue<Throwable> errors,
|
414 |
413 |
final DatasourceResponse datasourceResponse) {
|
415 |
414 |
Futures.addCallback(
|
416 |
|
service.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
|
|
415 |
executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
|
417 |
416 |
new FutureCallback<List<AggregationInfo>>() {
|
418 |
417 |
public void onSuccess(List<AggregationInfo> info) {
|
419 |
418 |
setAggregationHistory(datasourceResponse, info);
|
... | ... | |
424 |
423 |
errors.offer(e);
|
425 |
424 |
outerLatch.countDown();
|
426 |
425 |
}
|
427 |
|
});
|
|
426 |
}, executor);
|
428 |
427 |
}
|
429 |
428 |
|
430 |
429 |
private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {
|
passing the executor service to the Future callbacks