Project

General

Profile

« Previous | Next » 

Revision 50092

fast decrease of the latch in case of empty response

View differences:

modules/dnet-openaire-exporter/trunk/dnet-openaire-exporter.iml
37 37
    <orderEntry type="library" name="Maven: org.apache.neethi:neethi:3.0.3" level="project" />
38 38
    <orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
39 39
    <orderEntry type="library" name="Maven: org.apache.cxf:cxf-rt-transports-http:3.1.5" level="project" />
40
    <orderEntry type="library" name="Maven: eu.dnetlib:cnr-service-common:2.1.7-SNAPSHOT" level="project" />
40
    <orderEntry type="module" module-name="cnr-service-common (1)" />
41 41
    <orderEntry type="library" name="Maven: org.quartz-scheduler:quartz:2.2.2" level="project" />
42 42
    <orderEntry type="library" name="Maven: c3p0:c3p0:0.9.1.1" level="project" />
43 43
    <orderEntry type="library" name="Maven: org.springframework:spring-tx:4.3.7.RELEASE" level="project" />
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/clients/DatasourceIndexClient.java
148 148
			final QueryResponse rsp = indexClient.query(query);
149 149
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
150 150
			if (log.isDebugEnabled()) {
151
				log.debug(String.format("got document %s", doc.toString()));
151
				log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
152 152
			}
153 153

  
154 154
			return new IndexRecordsInfo()
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/clients/DatasourceDao.java
115 115
		return new Response().setDatasourceResponse(datasourceResponse).setHeader(new Header().setErrors(errors));
116 116
	}
117 117

  
118
	public Response searchByCountry(final String country, final Pageable pageable) {
118
	public Response searchByCountry(final String country, final Boolean managed, final Pageable pageable) {
119 119

  
120 120
		final List<DatasourceResponse> datasourceResponse = Lists.newArrayList();
121 121

  
......
123 123
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
124 124

  
125 125
		log.debug(String.format("search ds by country '%s'", country));
126
		dsRepository.findDistinctByOrganizationsCountryIgnoreCase(country, pageable).addCallback(getSearchCallback(outerLatch, errors, datasourceResponse));
127 126

  
127
		final ListenableFuture<Slice<Datasource>> datasources;
128

  
129
		if (managed == null) {
130
			datasources = dsRepository.findDistinctByOrganizationsCountryIgnoreCase(country, pageable);
131
		} else {
132
			datasources = dsRepository.findDistinctByManagedEqualsAndOrganizationsCountryIgnoreCase(managed, country, pageable);
133
		}
134

  
135
		datasources.addCallback(getSearchCallback(outerLatch, errors, datasourceResponse));
136

  
128 137
		waitLatch(outerLatch, errors, config.getRequestTimeout());
129 138

  
130 139
		return new Response().setDatasourceResponse(datasourceResponse).setHeader(new Header().setErrors(errors));
......
167 176

  
168 177
			@Override
169 178
			public void onSuccess(final Slice<Datasource> datasources) {
170
				datasources.forEach(d -> {
171
					final DatasourceResponse response = new DatasourceResponse();
172
					response.setDatasource(d);
173
					getAggregationHistory(d.getId(), outerLatch, errors, response);
174
					getIndexDsInfo(d.getId(), outerLatch, errors, response);
175
					datasourceResponse.add(response);
176
				});
177
				outerLatch.countDown();
179
				if (!datasources.hasContent()) {
180
					while (outerLatch.getCount() > 0) {
181
						log.debug(String.format("fast countdown for outer latch %s", outerLatch));
182
						outerLatch.countDown();
183
					}
184
				} else {
185
					datasources.forEach(d -> {
186
						final DatasourceResponse response = new DatasourceResponse();
187
						response.setDatasource(d);
188
						getAggregationHistory(d.getId(), outerLatch, errors, response);
189
						getIndexDsInfo(d.getId(), outerLatch, errors, response);
190
						datasourceResponse.add(response);
191
					});
192
					outerLatch.countDown();
193
				}
178 194
			}
179 195

  
180 196
			@Override
......
469 485
	}
470 486

  
471 487
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
488
		log.debug(String.format("thread %s awaits latch %s", Thread.currentThread().getId(), latch.toString()));
472 489
		try {
473
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
490
			final boolean await = latch.await(waitSeconds, TimeUnit.SECONDS);
491
			if (!await) {
474 492
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
493
			} else {
494
				log.debug(String.format("thread %s successfully awaied latch %s", Thread.currentThread().getId(), latch.toString()));
475 495
			}
476 496
		} catch (final InterruptedException e) {
477 497
			errors.offer(e);
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/DatasourcesApi.java
44 44
    @ApiResponses(value = {
45 45
            @ApiResponse(code = 200, message = "OK", response = DatasourceResponse[].class),
46 46
            @ApiResponse(code = 500, message = "unexpected error", response = Response.class) })
47
    List<DatasourceResponse> searchByCountry(String country, int page, int size);
47
    List<DatasourceResponse> searchByCountry(String country, Boolean managed, int page, int size);
48 48

  
49 49
    @ApiOperation(value = "search datasources by registering user", notes = "Returns list of Datasource details.", response = DatasourceResponse[].class)
50 50
    @ApiResponses(value = {
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/DatasourcesApiController.java
84 84
	@Override
85 85
	@RequestMapping(value = "/ds/search/country/{page}/{size}", produces = { "application/json" }, method = RequestMethod.GET)
86 86
	public List<DatasourceResponse> searchByCountry(
87
			@RequestParam final String country, @PathVariable final int page, @PathVariable final int size) {
87
			@RequestParam final String country,
88
			@RequestParam(required = false) final Boolean managed,
89
			@PathVariable final int page,
90
			@PathVariable final int size) {
88 91
		final StopWatch stop = StopWatch.createStarted();
89
		final Response rsp = dsDao.searchByCountry(country, new PageRequest(page, size));
92
		final Response rsp = dsDao.searchByCountry(country, managed, new PageRequest(page, size));
90 93
		rsp.getHeader().setTime(stop.getTime());
91 94
		if (log.isDebugEnabled()) {
92 95
			log.debug("searchByCountry: " + rsp.getHeader().toJson());

Also available in: Unified diff