Project

General

Profile

« Previous | Next » 

Revision 49929

passing the executor service to the Future callbacks

View differences:

modules/dnet-openaire-exporter/trunk/dnet-openaire-exporter.iml
18 18
    <orderEntry type="inheritedJdk" />
19 19
    <orderEntry type="sourceFolder" forTests="false" />
20 20
    <orderEntry type="library" name="Maven: eu.dnetlib:dnet-pace-core:2.5.1" level="project" />
21
    <orderEntry type="library" name="Maven: org.apache.solr:solr-solrj:7.1.0" level="project" />
22
    <orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.6.1" level="project" />
23
    <orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.10" level="project" />
24
    <orderEntry type="library" name="Maven: org.noggit:noggit:0.8" level="project" />
25
    <orderEntry type="library" name="Maven: org.apache.solr:solr-solrj:7.1.0" level="project" />
26
    <orderEntry type="library" name="Maven: org.apache.commons:commons-math3:3.6.1" level="project" />
27
    <orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.10" level="project" />
28
    <orderEntry type="library" name="Maven: org.noggit:noggit:0.8" level="project" />
29 21
    <orderEntry type="library" name="Maven: eu.dnetlib:cnr-rmi-api:2.6.2-SNAPSHOT" level="project" />
30 22
    <orderEntry type="library" name="Maven: org.apache.cxf:cxf-core:3.1.5" level="project" />
31 23
    <orderEntry type="library" name="Maven: org.apache.ws.xmlschema:xmlschema-core:2.2.1" level="project" />
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/clients/DatasourceDao.java
83 83
	@Autowired
84 84
	private DatasourceApiRepository datasourceApiRepository;
85 85

  
86
	private ListeningExecutorService service;
86
	private ListeningExecutorService executor;
87 87

  
88 88
	@PostConstruct
89 89
	public void init() {
90
		service = MoreExecutors.listeningDecorator(
90
		executor = MoreExecutors.listeningDecorator(
91 91
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
92 92
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
93 93
	}
......
356 356
			final Queue<Throwable> errors,
357 357
			final DatasourceResponse datasourceResponse) {
358 358
		Futures.addCallback(
359
				service.submit(() -> isLookupClient.calculateCurrentIndexDsInfo(errors)),
359
				executor.submit(() -> isLookupClient.calculateCurrentIndexDsInfo(errors)),
360 360
				new FutureCallback<IndexDsInfo>() {
361 361

  
362 362
					public void onSuccess(final IndexDsInfo info) {
......
364 364
						final CountDownLatch innerLatch = new CountDownLatch(2);
365 365

  
366 366
						Futures.addCallback(
367
								service.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
367
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
368 368
								new FutureCallback<IndexRecordsInfo>() {
369 369
									public void onSuccess(IndexRecordsInfo info) {
370 370
										datasourceResponse.setIndexRecords(info.getTotal()).setFundedContent(info.getFunded()).setLastIndexingDate(info.getDate());
......
374 374
										errors.offer(e);
375 375
										innerLatch.countDown();
376 376
									}
377
						});
377
						}, executor);
378 378

  
379 379
						Futures.addCallback(
380
								service.submit(() ->
380
								executor.submit(() ->
381 381
										objectStoreClient.getObjectStoreSize(
382 382
												isLookupClient.getObjectStoreId(dsId, errors),
383 383
												errors)),
......
393 393
										errors.offer(e);
394 394
										innerLatch.countDown();
395 395
									}
396
								}
397
						);
396
								}, executor);
398 397
						waitLatch(innerLatch, errors, config.getRequestTimeout());
399 398

  
400 399
						outerLatch.countDown();
......
405 404
						errors.offer(e);
406 405
						outerLatch.countDown();
407 406
					}
408
		});
407
		}, executor);
409 408
	}
410 409

  
411 410
	private void getAggregationHistory(final String dsId,
......
413 412
			final Queue<Throwable> errors,
414 413
			final DatasourceResponse datasourceResponse) {
415 414
		Futures.addCallback(
416
				service.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
415
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
417 416
				new FutureCallback<List<AggregationInfo>>() {
418 417
					public void onSuccess(List<AggregationInfo> info) {
419 418
						setAggregationHistory(datasourceResponse, info);
......
424 423
						errors.offer(e);
425 424
						outerLatch.countDown();
426 425
					}
427
		});
426
		}, executor);
428 427
	}
429 428

  
430 429
	private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {

Also available in: Unified diff