1 |
44690
|
claudio.at
|
package eu.dnetlib.datasource.publisher.clients;
|
2 |
44275
|
claudio.at
|
|
3 |
|
|
import java.util.List;
|
4 |
44690
|
claudio.at
|
import java.util.Queue;
|
5 |
|
|
import java.util.concurrent.*;
|
6 |
46876
|
claudio.at
|
import javax.annotation.Resource;
|
7 |
44275
|
claudio.at
|
|
8 |
44690
|
claudio.at
|
import com.google.common.util.concurrent.*;
|
9 |
44743
|
claudio.at
|
import eu.dnetlib.datasource.publisher.ApiException;
|
10 |
44765
|
claudio.at
|
import eu.dnetlib.datasource.publisher.clients.utils.IndexDsInfo;
|
11 |
|
|
import eu.dnetlib.datasource.publisher.clients.utils.IndexRecordsInfo;
|
12 |
46876
|
claudio.at
|
import eu.dnetlib.datasource.publisher.model.AggregationInfo;
|
13 |
|
|
import eu.dnetlib.datasource.publisher.model.AggregationStage;
|
14 |
|
|
import eu.dnetlib.datasource.publisher.model.DatasourceResponse;
|
15 |
|
|
import eu.dnetlib.datasource.publisher.model.db.Datasource;
|
16 |
|
|
import eu.dnetlib.datasource.publisher.model.db.SearchInterfacesEntry;
|
17 |
|
|
import eu.dnetlib.datasource.publisher.repository.DatasourceRepository;
|
18 |
|
|
import eu.dnetlib.datasource.publisher.repository.SearchApiRepository;
|
19 |
44690
|
claudio.at
|
import org.apache.commons.logging.Log;
|
20 |
|
|
import org.apache.commons.logging.LogFactory;
|
21 |
44275
|
claudio.at
|
import org.springframework.beans.factory.annotation.Autowired;
|
22 |
46718
|
claudio.at
|
import org.springframework.beans.factory.annotation.Value;
|
23 |
46876
|
claudio.at
|
import org.springframework.data.domain.Pageable;
|
24 |
|
|
import org.springframework.util.concurrent.ListenableFutureCallback;
|
25 |
44275
|
claudio.at
|
|
26 |
|
|
/**
|
27 |
|
|
* Created by claudio on 20/10/2016.
|
28 |
|
|
*/
|
29 |
|
|
public class DatasourceInfoRetriever {
|
30 |
|
|
|
31 |
44690
|
claudio.at
|
private static final Log log = LogFactory.getLog(DatasourceInfoRetriever.class);
|
32 |
|
|
|
33 |
44381
|
claudio.at
|
@Autowired
|
34 |
|
|
private MongoLoggerClient mongoLoggerClient;
|
35 |
44275
|
claudio.at
|
|
36 |
|
|
@Autowired
|
37 |
44381
|
claudio.at
|
private DatasourceIndexClient datasourceIndexClient;
|
38 |
44275
|
claudio.at
|
|
39 |
|
|
@Autowired
|
40 |
46876
|
claudio.at
|
private DatasourceRepository dsRepository;
|
41 |
44275
|
claudio.at
|
|
42 |
44690
|
claudio.at
|
@Autowired
|
43 |
46876
|
claudio.at
|
private SearchApiRepository searchApiRepository;
|
44 |
44690
|
claudio.at
|
|
45 |
46876
|
claudio.at
|
@Resource(name = "datasourceIsLookupClient")
|
46 |
|
|
private ISLookupClient isLookupClient;
|
47 |
|
|
|
48 |
44690
|
claudio.at
|
private ListeningExecutorService service;
|
49 |
|
|
|
50 |
|
|
private final static int WORKERS = 100;
|
51 |
|
|
|
52 |
46718
|
claudio.at
|
@Value("${datasource.publisher.timeout}")
|
53 |
|
|
private int timeout = 10;
|
54 |
|
|
|
55 |
44690
|
claudio.at
|
public DatasourceInfoRetriever() {
|
56 |
|
|
service = MoreExecutors.listeningDecorator(
|
57 |
|
|
new ScheduledThreadPoolExecutor(WORKERS,
|
58 |
|
|
new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
|
59 |
|
|
}
|
60 |
|
|
|
61 |
46876
|
claudio.at
|
public List<String> listIds(final Pageable pageable) throws ApiException {
|
62 |
|
|
return dsRepository.findAll(pageable)
|
63 |
|
|
.map(d -> d.getId())
|
64 |
|
|
.getContent();
|
65 |
44743
|
claudio.at
|
}
|
66 |
44690
|
claudio.at
|
|
67 |
44743
|
claudio.at
|
public ClientResponse getInfo(final String dsId) {
|
68 |
|
|
|
69 |
44690
|
claudio.at
|
final CountDownLatch outerLatch = new CountDownLatch(3);
|
70 |
|
|
final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
|
71 |
44765
|
claudio.at
|
final DatasourceResponse datasourceResponse = new DatasourceResponse();
|
72 |
44275
|
claudio.at
|
|
73 |
44690
|
claudio.at
|
Futures.addCallback(getAggregationHistory(dsId), new FutureCallback<List<AggregationInfo>>() {
|
74 |
|
|
public void onSuccess(List<AggregationInfo> info) {
|
75 |
44765
|
claudio.at
|
setAggregationHistory(datasourceResponse, info);
|
76 |
44690
|
claudio.at
|
outerLatch.countDown();
|
77 |
|
|
}
|
78 |
|
|
public void onFailure(Throwable e) {
|
79 |
|
|
errors.offer(e);
|
80 |
|
|
outerLatch.countDown();
|
81 |
|
|
}
|
82 |
|
|
});
|
83 |
44275
|
claudio.at
|
|
84 |
46876
|
claudio.at
|
dsRepository.findOneById(dsId).addCallback(new ListenableFutureCallback<Datasource>() {
|
85 |
44690
|
claudio.at
|
@Override
|
86 |
|
|
public void onSuccess(final Datasource datasource) {
|
87 |
44765
|
claudio.at
|
datasourceResponse.setDatasource(datasource);
|
88 |
44690
|
claudio.at
|
outerLatch.countDown();
|
89 |
|
|
}
|
90 |
44381
|
claudio.at
|
|
91 |
44690
|
claudio.at
|
@Override
|
92 |
|
|
public void onFailure(final Throwable e) {
|
93 |
|
|
errors.offer(e);
|
94 |
|
|
outerLatch.countDown();
|
95 |
|
|
}
|
96 |
|
|
});
|
97 |
|
|
|
98 |
|
|
Futures.addCallback(calculateCurrentIndexDsInfo(), new FutureCallback<IndexDsInfo>() {
|
99 |
|
|
|
100 |
|
|
public void onSuccess(final IndexDsInfo info) {
|
101 |
|
|
|
102 |
|
|
final CountDownLatch innerLatch = new CountDownLatch(1);
|
103 |
|
|
|
104 |
|
|
Futures.addCallback(getIndexInfo(dsId, info), new FutureCallback<IndexRecordsInfo>() {
|
105 |
|
|
public void onSuccess(IndexRecordsInfo info) {
|
106 |
44765
|
claudio.at
|
datasourceResponse.setIndexRecords(info.getCount()).setLastIndexingDate(info.getDate());
|
107 |
44690
|
claudio.at
|
innerLatch.countDown();
|
108 |
|
|
}
|
109 |
|
|
public void onFailure(Throwable e) {
|
110 |
|
|
errors.offer(e);
|
111 |
|
|
innerLatch.countDown();
|
112 |
|
|
}
|
113 |
|
|
});
|
114 |
46718
|
claudio.at
|
waitLatch(innerLatch, errors, timeout);
|
115 |
44690
|
claudio.at
|
|
116 |
|
|
outerLatch.countDown();
|
117 |
|
|
}
|
118 |
|
|
|
119 |
|
|
public void onFailure(final Throwable e) {
|
120 |
|
|
errors.offer(e);
|
121 |
|
|
outerLatch.countDown();
|
122 |
|
|
}
|
123 |
|
|
});
|
124 |
|
|
|
125 |
46718
|
claudio.at
|
waitLatch(outerLatch, errors, timeout);
|
126 |
44690
|
claudio.at
|
|
127 |
46876
|
claudio.at
|
/*
|
128 |
44690
|
claudio.at
|
if (!errors.isEmpty()) {
|
129 |
46876
|
claudio.at
|
datasourceResponse.getResponseHeader().setError(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> e.getMessage()).collect(Collectors.toList())));
|
130 |
|
|
log.error(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> ExceptionUtils.getFullStackTrace(e)).collect(Collectors.toList())));
|
131 |
44690
|
claudio.at
|
}
|
132 |
46876
|
claudio.at
|
*/
|
133 |
44690
|
claudio.at
|
|
134 |
44765
|
claudio.at
|
return new ClientResponse().datasourceInfo(datasourceResponse).errors(errors);
|
135 |
44690
|
claudio.at
|
}
|
136 |
|
|
|
137 |
46876
|
claudio.at
|
public List<SearchInterfacesEntry> searchInterface(final String field, final String value) {
|
138 |
|
|
switch (field) {
|
139 |
|
|
case "__SEARCH__":
|
140 |
|
|
return searchApiRepository
|
141 |
|
|
.findByRepoidContainingOrRepoNameContainingOrAlternativeNameContainingOrRepoPrefixContainingOrRepoOrganizationContainingAllIgnoreCase(
|
142 |
|
|
value, value, value, value, value);
|
143 |
|
|
case "country":
|
144 |
|
|
break;
|
145 |
|
|
case "type":
|
146 |
|
|
break;
|
147 |
|
|
case "protocol":
|
148 |
|
|
break;
|
149 |
|
|
case "compliance":
|
150 |
|
|
break;
|
151 |
|
|
case "active":
|
152 |
|
|
break;
|
153 |
|
|
default:
|
154 |
|
|
throw new IllegalArgumentException("");
|
155 |
|
|
}
|
156 |
|
|
return null;
|
157 |
44690
|
claudio.at
|
}
|
158 |
|
|
|
159 |
|
|
private ListenableFuture<IndexDsInfo> calculateCurrentIndexDsInfo() {
|
160 |
46876
|
claudio.at
|
return service.submit(() -> isLookupClient.calculateCurrentIndexDsInfo());
|
161 |
44690
|
claudio.at
|
}
|
162 |
|
|
|
163 |
|
|
private ListenableFuture<IndexRecordsInfo> getIndexInfo(final String dsId, final IndexDsInfo info) {
|
164 |
46876
|
claudio.at
|
return service.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info));
|
165 |
44690
|
claudio.at
|
}
|
166 |
|
|
|
167 |
|
|
private ListenableFuture<List<AggregationInfo>> getAggregationHistory(final String dsId) {
|
168 |
46876
|
claudio.at
|
return service.submit(() -> mongoLoggerClient.getAggregationHistory(dsId));
|
169 |
44690
|
claudio.at
|
}
|
170 |
|
|
|
171 |
44765
|
claudio.at
|
private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {
|
172 |
|
|
datasourceResponse.setAggregationHistory(info);
|
173 |
44690
|
claudio.at
|
if (!info.isEmpty()) {
|
174 |
46876
|
claudio.at
|
datasourceResponse
|
175 |
|
|
.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
|
176 |
|
|
.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
|
177 |
44275
|
claudio.at
|
}
|
178 |
|
|
}
|
179 |
|
|
|
180 |
44690
|
claudio.at
|
private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
|
181 |
|
|
try {
|
182 |
|
|
if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
|
183 |
|
|
errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
|
184 |
|
|
}
|
185 |
|
|
} catch (final InterruptedException e) {
|
186 |
|
|
errors.offer(e);
|
187 |
|
|
}
|
188 |
|
|
}
|
189 |
|
|
|
190 |
44275
|
claudio.at
|
}
|