Revision 50092
Added by Claudio Atzori over 6 years ago
modules/dnet-openaire-exporter/trunk/dnet-openaire-exporter.iml | ||
---|---|---|
37 | 37 |
<orderEntry type="library" name="Maven: org.apache.neethi:neethi:3.0.3" level="project" /> |
38 | 38 |
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" /> |
39 | 39 |
<orderEntry type="library" name="Maven: org.apache.cxf:cxf-rt-transports-http:3.1.5" level="project" /> |
40 |
<orderEntry type="library" name="Maven: eu.dnetlib:cnr-service-common:2.1.7-SNAPSHOT" level="project" />
|
|
40 |
<orderEntry type="module" module-name="cnr-service-common (1)" />
|
|
41 | 41 |
<orderEntry type="library" name="Maven: org.quartz-scheduler:quartz:2.2.2" level="project" /> |
42 | 42 |
<orderEntry type="library" name="Maven: c3p0:c3p0:0.9.1.1" level="project" /> |
43 | 43 |
<orderEntry type="library" name="Maven: org.springframework:spring-tx:4.3.7.RELEASE" level="project" /> |
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/clients/DatasourceIndexClient.java | ||
---|---|---|
148 | 148 |
final QueryResponse rsp = indexClient.query(query); |
149 | 149 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument()); |
150 | 150 |
if (log.isDebugEnabled()) { |
151 |
log.debug(String.format("got document %s", doc.toString()));
|
|
151 |
log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
|
|
152 | 152 |
} |
153 | 153 |
|
154 | 154 |
return new IndexRecordsInfo() |
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/clients/DatasourceDao.java | ||
---|---|---|
115 | 115 |
return new Response().setDatasourceResponse(datasourceResponse).setHeader(new Header().setErrors(errors)); |
116 | 116 |
} |
117 | 117 |
|
118 |
public Response searchByCountry(final String country, final Pageable pageable) { |
|
118 |
public Response searchByCountry(final String country, final Boolean managed, final Pageable pageable) {
|
|
119 | 119 |
|
120 | 120 |
final List<DatasourceResponse> datasourceResponse = Lists.newArrayList(); |
121 | 121 |
|
... | ... | |
123 | 123 |
final Queue<Throwable> errors = new ConcurrentLinkedQueue<>(); |
124 | 124 |
|
125 | 125 |
log.debug(String.format("search ds by country '%s'", country)); |
126 |
dsRepository.findDistinctByOrganizationsCountryIgnoreCase(country, pageable).addCallback(getSearchCallback(outerLatch, errors, datasourceResponse)); |
|
127 | 126 |
|
127 |
final ListenableFuture<Slice<Datasource>> datasources; |
|
128 |
|
|
129 |
if (managed == null) { |
|
130 |
datasources = dsRepository.findDistinctByOrganizationsCountryIgnoreCase(country, pageable); |
|
131 |
} else { |
|
132 |
datasources = dsRepository.findDistinctByManagedEqualsAndOrganizationsCountryIgnoreCase(managed, country, pageable); |
|
133 |
} |
|
134 |
|
|
135 |
datasources.addCallback(getSearchCallback(outerLatch, errors, datasourceResponse)); |
|
136 |
|
|
128 | 137 |
waitLatch(outerLatch, errors, config.getRequestTimeout()); |
129 | 138 |
|
130 | 139 |
return new Response().setDatasourceResponse(datasourceResponse).setHeader(new Header().setErrors(errors)); |
... | ... | |
167 | 176 |
|
168 | 177 |
@Override |
169 | 178 |
public void onSuccess(final Slice<Datasource> datasources) { |
170 |
datasources.forEach(d -> { |
|
171 |
final DatasourceResponse response = new DatasourceResponse(); |
|
172 |
response.setDatasource(d); |
|
173 |
getAggregationHistory(d.getId(), outerLatch, errors, response); |
|
174 |
getIndexDsInfo(d.getId(), outerLatch, errors, response); |
|
175 |
datasourceResponse.add(response); |
|
176 |
}); |
|
177 |
outerLatch.countDown(); |
|
179 |
if (!datasources.hasContent()) { |
|
180 |
while (outerLatch.getCount() > 0) { |
|
181 |
log.debug(String.format("fast countdown for outer latch %s", outerLatch)); |
|
182 |
outerLatch.countDown(); |
|
183 |
} |
|
184 |
} else { |
|
185 |
datasources.forEach(d -> { |
|
186 |
final DatasourceResponse response = new DatasourceResponse(); |
|
187 |
response.setDatasource(d); |
|
188 |
getAggregationHistory(d.getId(), outerLatch, errors, response); |
|
189 |
getIndexDsInfo(d.getId(), outerLatch, errors, response); |
|
190 |
datasourceResponse.add(response); |
|
191 |
}); |
|
192 |
outerLatch.countDown(); |
|
193 |
} |
|
178 | 194 |
} |
179 | 195 |
|
180 | 196 |
@Override |
... | ... | |
469 | 485 |
} |
470 | 486 |
|
471 | 487 |
private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) { |
488 |
log.debug(String.format("thread %s awaits latch %s", Thread.currentThread().getId(), latch.toString())); |
|
472 | 489 |
try { |
473 |
if (!latch.await(waitSeconds, TimeUnit.SECONDS)) { |
|
490 |
final boolean await = latch.await(waitSeconds, TimeUnit.SECONDS); |
|
491 |
if (!await) { |
|
474 | 492 |
errors.offer(new TimeoutException("Waiting for requests to complete has timed out.")); |
493 |
} else { |
|
494 |
log.debug(String.format("thread %s successfully awaied latch %s", Thread.currentThread().getId(), latch.toString())); |
|
475 | 495 |
} |
476 | 496 |
} catch (final InterruptedException e) { |
477 | 497 |
errors.offer(e); |
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/DatasourcesApi.java | ||
---|---|---|
44 | 44 |
@ApiResponses(value = { |
45 | 45 |
@ApiResponse(code = 200, message = "OK", response = DatasourceResponse[].class), |
46 | 46 |
@ApiResponse(code = 500, message = "unexpected error", response = Response.class) }) |
47 |
List<DatasourceResponse> searchByCountry(String country, int page, int size); |
|
47 |
List<DatasourceResponse> searchByCountry(String country, Boolean managed, int page, int size);
|
|
48 | 48 |
|
49 | 49 |
@ApiOperation(value = "search datasources by registering user", notes = "Returns list of Datasource details.", response = DatasourceResponse[].class) |
50 | 50 |
@ApiResponses(value = { |
modules/dnet-openaire-exporter/trunk/src/main/java/eu/dnetlib/openaire/exporter/datasource/DatasourcesApiController.java | ||
---|---|---|
84 | 84 |
@Override |
85 | 85 |
@RequestMapping(value = "/ds/search/country/{page}/{size}", produces = { "application/json" }, method = RequestMethod.GET) |
86 | 86 |
public List<DatasourceResponse> searchByCountry( |
87 |
@RequestParam final String country, @PathVariable final int page, @PathVariable final int size) { |
|
87 |
@RequestParam final String country, |
|
88 |
@RequestParam(required = false) final Boolean managed, |
|
89 |
@PathVariable final int page, |
|
90 |
@PathVariable final int size) { |
|
88 | 91 |
final StopWatch stop = StopWatch.createStarted(); |
89 |
final Response rsp = dsDao.searchByCountry(country, new PageRequest(page, size)); |
|
92 |
final Response rsp = dsDao.searchByCountry(country, managed, new PageRequest(page, size));
|
|
90 | 93 |
rsp.getHeader().setTime(stop.getTime()); |
91 | 94 |
if (log.isDebugEnabled()) { |
92 | 95 |
log.debug("searchByCountry: " + rsp.getHeader().toJson()); |
Also available in: Unified diff
fast decrease of the latch in case of empty response