Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource;
2

    
3
import java.util.List;
4
import java.util.Queue;
5
import java.util.concurrent.CountDownLatch;
6
import java.util.concurrent.ScheduledThreadPoolExecutor;
7
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.TimeoutException;
9
import java.util.stream.Collectors;
10
import javax.annotation.PostConstruct;
11

    
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Queues;
14
import com.google.common.util.concurrent.*;
15
import eu.dnetlib.OpenaireExporterConfig;
16
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
17
import eu.dnetlib.enabling.datasources.common.AggregationStage;
18
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException;
19
import eu.dnetlib.openaire.exporter.datasource.clients.*;
20
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
21
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
22
import eu.dnetlib.openaire.exporter.model.ApiDetailsResponse;
23
import eu.dnetlib.openaire.exporter.model.ConversionUtils;
24
import eu.dnetlib.openaire.exporter.model.DatasourceResponse;
25
import eu.dnetlib.openaire.exporter.model.datasource.*;
26
import eu.dnetlib.openaire.exporter.model.datasource.db.ApiDbEntry;
27
import eu.dnetlib.openaire.exporter.model.datasource.db.DatasourceDbEntry;
28
import eu.dnetlib.openaire.exporter.vocabularies.Country;
29
import org.apache.commons.lang3.exception.ExceptionUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.springframework.beans.factory.annotation.Autowired;
33
import org.springframework.data.domain.Page;
34
import org.springframework.stereotype.Component;
35

    
36
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.apiResponse;
37
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.datasourceResponse;
38

    
39
@Component
40
public class DatasourceManagerCore {
41

    
42
	private static final Log log = LogFactory.getLog(DatasourceManagerCore.class);
43

    
44
	@Autowired
45
	private MongoLoggerClient mongoLoggerClient;
46

    
47
	@Autowired
48
	private ISLookupClient isLookupClient;
49

    
50
	@Autowired
51
	private ObjectStoreClient objectStoreClient;
52

    
53
	@Autowired
54
	private DatasourceIndexClient datasourceIndexClient;
55

    
56
	@Autowired
57
	private VocabularyClient vocabularyClient;
58

    
59
	@Autowired
60
	private DatasourceDao dsDao;
61

    
62
	@Autowired
63
	private OpenaireExporterConfig config;
64

    
65
	private ListeningExecutorService executor;
66

    
67
	@PostConstruct
68
	public void init() {
69
		executor = MoreExecutors.listeningDecorator(
70
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
71
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
72
	}
73

    
74
	public List<Country> listCountries() throws DatasourceManagerException {
75
		return dsDao.listCountries();
76
	}
77

    
78
	public List<String> listDatasourceIds(final int page, final int size) throws DatasourceManagerException {
79
		return dsDao.listIds(page, size);
80
	}
81

    
82
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
83
			throws DatasourceManagerException {
84
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
85
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
86
		final CountDownLatch outerLatch = new CountDownLatch(2);
87

    
88
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
89
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
90
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(ConversionUtils.asDetails(d), outerLatch, errors)));
91
			waitLatch(outerLatch, errors, config.getRequestTimeout());
92
		}
93

    
94
		return datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
95
	}
96

    
97
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DatasourceManagerException {
98
		return dsDao.findBaseURLs(requestFilter, page, size);
99
	}
100

    
101
	public ApiDetailsResponse getApis(final String dsId) throws DatasourceManagerException {
102
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
103
		final List<ApiDetails> api = apis.stream()
104
				.map(ConversionUtils::asDetails)
105
				.collect(Collectors.toList());
106
		return apiResponse(api, api.size());
107
	}
108

    
109
	public void setManaged(final String dsId, final boolean managed) throws DatasourceManagerException {
110
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
111
		dsDao.setManaged(dsId, managed);
112
	}
113

    
114
	public boolean isManaged(final String dsId) throws DatasourceManagerException {
115
		return dsDao.isManaged(dsId);
116
	}
117

    
118
	public boolean exist(final DatasourceDetails d) throws DatasourceManagerException {
119
		return ((DatasourceDaoImpl) dsDao).existDs(d.getId());
120
	}
121

    
122
	public void save(final DatasourceDetails d) throws DatasourceManagerException {
123
		dsDao.saveDs(ConversionUtils.asDbEntry(d));
124

    
125
		//TODO add also to the IS
126
	}
127

    
128
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DatasourceManagerException {
129
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
130
		dsDao.updateDatasourceName(dsId, officialname, englishname);
131

    
132
		//TODO update also to the IS
133
	}
134

    
135
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DatasourceManagerException {
136
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
137
		dsDao.updateCoordinates(dsId, latitude, longitude);
138

    
139
		//TODO update also to the IS
140
	}
141

    
142
	public void updateApiBaseurl(final String apiId, final String baseUrl) throws DatasourceManagerException {
143
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
144
		dsDao.updateBaseUrl(apiId, baseUrl);
145

    
146
		//TODO update also to the IS
147
	}
148

    
149
	public void updateTimezone(final String dsId, final String timezone) throws DatasourceManagerException {
150
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
151
		dsDao.updateTimezone(dsId, timezone);
152

    
153
		//TODO update also to the IS
154
	}
155

    
156
	public void updateDsTypology(final String dsId, final String typology) throws DatasourceManagerException {
157
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
158
		dsDao.updateTypology(dsId, typology);
159

    
160
		//TODO update also to the IS
161
	}
162

    
163
	public void updateApiCompatibility(final String apiId, final String compliance, final boolean override) throws DatasourceManagerException {
164
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
165
		dsDao.updateCompliance(null, apiId, compliance, override);
166

    
167
		//TODO update also to the IS
168
	}
169

    
170
	public void deleteDs(final String dsId) throws DatasourceManagerException {
171
		log.info(String.format("deleted datasource '%s'", dsId));
172
		dsDao.deleteDs(dsId);
173
	}
174

    
175
	public void deleteApi(final String apiId) throws DatasourceManagerException {
176
		dsDao.deleteApi(null, apiId);
177
	}
178

    
179
	public void dropCaches() {
180
		mongoLoggerClient.dropCache();
181
		isLookupClient.dropCache();
182
		vocabularyClient.dropCache();
183
	}
184

    
185
	// HELPERS //////////////
186

    
187
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
188
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
189
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
190
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
191
		return dsInfo;
192
	}
193

    
194
	private void getAggregationHistory(final String dsId,
195
			final CountDownLatch outerLatch,
196
			final Queue<Throwable> errors,
197
			final DatasourceInfo datasourceInfo) {
198
		Futures.addCallback(
199
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
200
				new FutureCallback<List<AggregationInfo>>() {
201

    
202
					public void onSuccess(List<AggregationInfo> info) {
203
						setAggregationHistory(datasourceInfo, info);
204
						outerLatch.countDown();
205
					}
206

    
207
					public void onFailure(Throwable e) {
208
						log.error(ExceptionUtils.getStackTrace(e));
209
						errors.offer(e);
210
						outerLatch.countDown();
211
					}
212
				}, executor);
213
	}
214

    
215
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
216
		datasourceInfo.setAggregationHistory(info);
217
		if (!info.isEmpty()) {
218
			datasourceInfo
219
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
220
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
221
		}
222
	}
223

    
224
	private void getIndexDsInfo(final String dsId,
225
			final CountDownLatch outerLatch,
226
			final Queue<Throwable> errors,
227
			final DatasourceInfo datasourceInfo) {
228
		Futures.addCallback(
229
				executor.submit(() -> isLookupClient.calculateCurrentIndexDsInfo()),
230
				new FutureCallback<IndexDsInfo>() {
231

    
232
					public void onSuccess(final IndexDsInfo info) {
233

    
234
						final CountDownLatch innerLatch = new CountDownLatch(2);
235

    
236
						Futures.addCallback(
237
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
238
								new FutureCallback<IndexRecordsInfo>() {
239

    
240
									public void onSuccess(IndexRecordsInfo info) {
241
										datasourceInfo
242
												.setIndexRecords(info.getTotal())
243
												.setFundedContent(info.getFunded())
244
												.setLastIndexingDate(info.getDate());
245
										innerLatch.countDown();
246
									}
247

    
248
									public void onFailure(Throwable e) {
249
										errors.offer(e);
250
										innerLatch.countDown();
251
									}
252
								}, executor);
253

    
254
						Futures.addCallback(
255
								executor.submit(() ->
256
										objectStoreClient.getObjectStoreSize(
257
												isLookupClient.getObjectStoreId(dsId, errors),
258
												errors)),
259
								new FutureCallback<Long>() {
260
									@Override
261
									public void onSuccess(final Long objectStoreSize) {
262
										datasourceInfo.setFulltexts(objectStoreSize);
263
										innerLatch.countDown();
264
									}
265

    
266
									@Override
267
									public void onFailure(final Throwable e) {
268
										errors.offer(e);
269
										innerLatch.countDown();
270
									}
271
								}, executor);
272

    
273
						waitLatch(innerLatch, errors, config.getRequestTimeout());
274

    
275
						outerLatch.countDown();
276
					}
277

    
278
					public void onFailure(final Throwable e) {
279
						//log.error(ExceptionUtils.getStackTrace(e));
280
						errors.offer(e);
281
						outerLatch.countDown();
282
					}
283
				}, executor);
284
	}
285

    
286
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
287
		try {
288
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
289
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
290
			}
291
		} catch (final InterruptedException e) {
292
			errors.offer(e);
293
		}
294
	}
295

    
296
}
(1-1/2)