Project

General

Profile

1 49994 claudio.at
package eu.dnetlib.openaire.exporter.datasource;
2
3
import java.util.List;
4
import java.util.Queue;
5 50143 claudio.at
import java.util.concurrent.CountDownLatch;
6
import java.util.concurrent.ScheduledThreadPoolExecutor;
7
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.TimeoutException;
9 49994 claudio.at
import java.util.stream.Collectors;
10
import javax.annotation.PostConstruct;
11
12
import com.google.common.collect.Lists;
13 50143 claudio.at
import com.google.common.collect.Queues;
14 49994 claudio.at
import com.google.common.util.concurrent.*;
15
import eu.dnetlib.OpenaireExporterConfig;
16
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
17
import eu.dnetlib.enabling.datasources.common.AggregationStage;
18
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException;
19
import eu.dnetlib.openaire.exporter.datasource.clients.*;
20
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
21
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
22 50143 claudio.at
import eu.dnetlib.openaire.exporter.model.ApiDetailsResponse;
23 49994 claudio.at
import eu.dnetlib.openaire.exporter.model.ConversionUtils;
24 50143 claudio.at
import eu.dnetlib.openaire.exporter.model.DatasourceResponse;
25
import eu.dnetlib.openaire.exporter.model.datasource.*;
26 49994 claudio.at
import eu.dnetlib.openaire.exporter.model.datasource.db.ApiDbEntry;
27
import eu.dnetlib.openaire.exporter.model.datasource.db.DatasourceDbEntry;
28
import eu.dnetlib.openaire.exporter.vocabularies.Country;
29
import org.apache.commons.lang3.exception.ExceptionUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.springframework.beans.factory.annotation.Autowired;
33 50143 claudio.at
import org.springframework.data.domain.Page;
34 49994 claudio.at
import org.springframework.stereotype.Component;
35
36 50143 claudio.at
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.apiResponse;
37
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.datasourceResponse;
38
39 49994 claudio.at
@Component
40
public class DatasourceManagerCore {
41
42
	private static final Log log = LogFactory.getLog(DatasourceManagerCore.class);
43
44
	@Autowired
45
	private MongoLoggerClient mongoLoggerClient;
46
47
	@Autowired
48
	private ISLookupClient isLookupClient;
49
50
	@Autowired
51
	private ObjectStoreClient objectStoreClient;
52
53
	@Autowired
54
	private DatasourceIndexClient datasourceIndexClient;
55
56
	@Autowired
57
	private VocabularyClient vocabularyClient;
58
59
	@Autowired
60
	private DatasourceDao dsDao;
61
62
	@Autowired
63
	private OpenaireExporterConfig config;
64
65
	private ListeningExecutorService executor;
66
67
	@PostConstruct
68
	public void init() {
69
		executor = MoreExecutors.listeningDecorator(
70
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
71
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
72
	}
73
74
	public List<Country> listCountries() throws DatasourceManagerException {
75
		return dsDao.listCountries();
76
	}
77
78 50143 claudio.at
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
79
			throws DatasourceManagerException {
80
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
81
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
82
		final CountDownLatch outerLatch = new CountDownLatch(2);
83 49994 claudio.at
84 50143 claudio.at
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
85
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
86
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(ConversionUtils.asDetails(d), outerLatch, errors)));
87
			waitLatch(outerLatch, errors, config.getRequestTimeout());
88
		}
89 49994 claudio.at
90 50143 claudio.at
		return datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
91 49994 claudio.at
	}
92
93 50143 claudio.at
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DatasourceManagerException {
94
		return dsDao.findBaseURLs(requestFilter, page, size);
95 49994 claudio.at
	}
96
97 50143 claudio.at
	public ApiDetailsResponse getApis(final String dsId) throws DatasourceManagerException {
98 49994 claudio.at
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
99 50143 claudio.at
		final List<ApiDetails> api = apis.stream()
100 49994 claudio.at
				.map(ConversionUtils::asDetails)
101
				.collect(Collectors.toList());
102 50143 claudio.at
		return apiResponse(api, api.size());
103 49994 claudio.at
	}
104
105
	public void setManaged(final String dsId, final boolean managed) throws DatasourceManagerException {
106
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
107
		dsDao.setManaged(dsId, managed);
108
	}
109
110
	public boolean isManaged(final String dsId) throws DatasourceManagerException {
111
		return dsDao.isManaged(dsId);
112
	}
113
114
	public boolean exist(final DatasourceDetails d) throws DatasourceManagerException {
115
		return ((DatasourceDaoImpl) dsDao).existDs(d.getId());
116
	}
117
118
	public void save(final DatasourceDetails d) throws DatasourceManagerException {
119
		dsDao.saveDs(ConversionUtils.asDbEntry(d));
120 50143 claudio.at
121
		//TODO add also to the IS
122 49994 claudio.at
	}
123
124
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DatasourceManagerException {
125
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
126
		dsDao.updateDatasourceName(dsId, officialname, englishname);
127 50143 claudio.at
128
		//TODO update also to the IS
129 49994 claudio.at
	}
130
131
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DatasourceManagerException {
132
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
133
		dsDao.updateCoordinates(dsId, latitude, longitude);
134 50143 claudio.at
135
		//TODO update also to the IS
136 49994 claudio.at
	}
137
138
	public void updateApiBaseurl(final String apiId, final String baseUrl) throws DatasourceManagerException {
139
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
140
		dsDao.updateBaseUrl(apiId, baseUrl);
141 50143 claudio.at
142
		//TODO update also to the IS
143 49994 claudio.at
	}
144
145
	public void updateTimezone(final String dsId, final String timezone) throws DatasourceManagerException {
146
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
147
		dsDao.updateTimezone(dsId, timezone);
148 50143 claudio.at
149
		//TODO update also to the IS
150 49994 claudio.at
	}
151
152
	public void updateDsTypology(final String dsId, final String typology) throws DatasourceManagerException {
153
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
154
		dsDao.updateTypology(dsId, typology);
155 50143 claudio.at
156
		//TODO update also to the IS
157 49994 claudio.at
	}
158
159
	public void updateApiCompatibility(final String apiId, final String compliance, final boolean override) throws DatasourceManagerException {
160
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
161
		dsDao.updateCompliance(null, apiId, compliance, override);
162 50143 claudio.at
163
		//TODO update also to the IS
164 49994 claudio.at
	}
165
166
	public void deleteDs(final String dsId) throws DatasourceManagerException {
167
		log.info(String.format("deleted datasource '%s'", dsId));
168
		dsDao.deleteDs(dsId);
169
	}
170
171 50143 claudio.at
	public void deleteApi(final String apiId) throws DatasourceManagerException {
172
		dsDao.deleteApi(null, apiId);
173
	}
174
175 49994 claudio.at
	public void dropCaches() {
176
		mongoLoggerClient.dropCache();
177
		isLookupClient.dropCache();
178
		vocabularyClient.dropCache();
179
	}
180
181 50143 claudio.at
	// HELPERS //////////////
182 49994 claudio.at
183 50143 claudio.at
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
184
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
185
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
186
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
187
		return dsInfo;
188 49994 claudio.at
	}
189
190
	private void getAggregationHistory(final String dsId,
191
			final CountDownLatch outerLatch,
192
			final Queue<Throwable> errors,
193
			final DatasourceInfo datasourceInfo) {
194
		Futures.addCallback(
195
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
196
				new FutureCallback<List<AggregationInfo>>() {
197
198
					public void onSuccess(List<AggregationInfo> info) {
199
						setAggregationHistory(datasourceInfo, info);
200
						outerLatch.countDown();
201
					}
202
203
					public void onFailure(Throwable e) {
204
						log.error(ExceptionUtils.getStackTrace(e));
205
						errors.offer(e);
206
						outerLatch.countDown();
207
					}
208
				}, executor);
209
	}
210
211 50143 claudio.at
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
212 49994 claudio.at
		datasourceInfo.setAggregationHistory(info);
213
		if (!info.isEmpty()) {
214
			datasourceInfo
215
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
216
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
217
		}
218
	}
219
220
	private void getIndexDsInfo(final String dsId,
221
			final CountDownLatch outerLatch,
222
			final Queue<Throwable> errors,
223
			final DatasourceInfo datasourceInfo) {
224
		Futures.addCallback(
225 50143 claudio.at
				executor.submit(() -> isLookupClient.calculateCurrentIndexDsInfo()),
226 49994 claudio.at
				new FutureCallback<IndexDsInfo>() {
227
228
					public void onSuccess(final IndexDsInfo info) {
229
230
						final CountDownLatch innerLatch = new CountDownLatch(2);
231
232
						Futures.addCallback(
233
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
234
								new FutureCallback<IndexRecordsInfo>() {
235
236
									public void onSuccess(IndexRecordsInfo info) {
237
										datasourceInfo
238
												.setIndexRecords(info.getTotal())
239
												.setFundedContent(info.getFunded())
240
												.setLastIndexingDate(info.getDate());
241
										innerLatch.countDown();
242
									}
243
244
									public void onFailure(Throwable e) {
245
										errors.offer(e);
246
										innerLatch.countDown();
247
									}
248
								}, executor);
249
250
						Futures.addCallback(
251
								executor.submit(() ->
252
										objectStoreClient.getObjectStoreSize(
253
												isLookupClient.getObjectStoreId(dsId, errors),
254
												errors)),
255
								new FutureCallback<Long>() {
256
									@Override
257
									public void onSuccess(final Long objectStoreSize) {
258
										datasourceInfo.setFulltexts(objectStoreSize);
259
										innerLatch.countDown();
260
									}
261
262
									@Override
263
									public void onFailure(final Throwable e) {
264
										errors.offer(e);
265
										innerLatch.countDown();
266
									}
267
								}, executor);
268
269
						waitLatch(innerLatch, errors, config.getRequestTimeout());
270
271
						outerLatch.countDown();
272
					}
273
274
					public void onFailure(final Throwable e) {
275 50143 claudio.at
						//log.error(ExceptionUtils.getStackTrace(e));
276 49994 claudio.at
						errors.offer(e);
277
						outerLatch.countDown();
278
					}
279
				}, executor);
280
	}
281
282
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
283
		try {
284
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
285
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
286
			}
287
		} catch (final InterruptedException e) {
288
			errors.offer(e);
289
		}
290
	}
291
292
}