Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource;
2

    
3
import java.util.List;
4
import java.util.Queue;
5
import java.util.concurrent.CountDownLatch;
6
import java.util.concurrent.ScheduledThreadPoolExecutor;
7
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.TimeoutException;
9
import java.util.stream.Collectors;
10
import javax.annotation.PostConstruct;
11

    
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Queues;
14
import com.google.common.util.concurrent.*;
15
import eu.dnetlib.OpenaireExporterConfig;
16
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
17
import eu.dnetlib.enabling.datasources.common.AggregationStage;
18
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException;
19
import eu.dnetlib.openaire.exporter.datasource.clients.*;
20
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
21
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
22
import eu.dnetlib.openaire.exporter.model.ApiDetailsResponse;
23
import eu.dnetlib.openaire.exporter.model.ConversionUtils;
24
import eu.dnetlib.openaire.exporter.model.DatasourceResponse;
25
import eu.dnetlib.openaire.exporter.model.datasource.*;
26
import eu.dnetlib.openaire.exporter.model.datasource.db.ApiDbEntry;
27
import eu.dnetlib.openaire.exporter.model.datasource.db.DatasourceDbEntry;
28
import eu.dnetlib.openaire.exporter.vocabularies.Country;
29
import org.apache.commons.lang3.exception.ExceptionUtils;
30
import org.apache.commons.logging.Log;
31
import org.apache.commons.logging.LogFactory;
32
import org.springframework.beans.factory.annotation.Autowired;
33
import org.springframework.data.domain.Page;
34
import org.springframework.stereotype.Component;
35

    
36
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.apiResponse;
37
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.datasourceResponse;
38

    
39
@Component
40
public class DatasourceManagerCore {
41

    
42
	private static final Log log = LogFactory.getLog(DatasourceManagerCore.class);
43

    
44
	@Autowired
45
	private MongoLoggerClient mongoLoggerClient;
46

    
47
	@Autowired
48
	private ISLookupClient isLookupClient;
49

    
50
	@Autowired
51
	private ObjectStoreClient objectStoreClient;
52

    
53
	@Autowired
54
	private DatasourceIndexClient datasourceIndexClient;
55

    
56
	@Autowired
57
	private VocabularyClient vocabularyClient;
58

    
59
	@Autowired
60
	private DatasourceDao dsDao;
61

    
62
	@Autowired
63
	private OpenaireExporterConfig config;
64

    
65
	private ListeningExecutorService executor;
66

    
67
	@PostConstruct
68
	public void init() {
69
		executor = MoreExecutors.listeningDecorator(
70
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
71
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
72
	}
73

    
74
	public List<Country> listCountries() throws DatasourceManagerException {
75
		return dsDao.listCountries();
76
	}
77

    
78
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
79
			throws DatasourceManagerException {
80
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
81
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
82
		final CountDownLatch outerLatch = new CountDownLatch(2);
83

    
84
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
85
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
86
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(ConversionUtils.asDetails(d), outerLatch, errors)));
87
			waitLatch(outerLatch, errors, config.getRequestTimeout());
88
		}
89

    
90
		return datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
91
	}
92

    
93
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DatasourceManagerException {
94
		return dsDao.findBaseURLs(requestFilter, page, size);
95
	}
96

    
97
	public ApiDetailsResponse getApis(final String dsId) throws DatasourceManagerException {
98
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
99
		final List<ApiDetails> api = apis.stream()
100
				.map(ConversionUtils::asDetails)
101
				.collect(Collectors.toList());
102
		return apiResponse(api, api.size());
103
	}
104

    
105
	public void setManaged(final String dsId, final boolean managed) throws DatasourceManagerException {
106
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
107
		dsDao.setManaged(dsId, managed);
108
	}
109

    
110
	public boolean isManaged(final String dsId) throws DatasourceManagerException {
111
		return dsDao.isManaged(dsId);
112
	}
113

    
114
	public boolean exist(final DatasourceDetails d) throws DatasourceManagerException {
115
		return ((DatasourceDaoImpl) dsDao).existDs(d.getId());
116
	}
117

    
118
	public void save(final DatasourceDetails d) throws DatasourceManagerException {
119
		dsDao.saveDs(ConversionUtils.asDbEntry(d));
120

    
121
		//TODO add also to the IS
122
	}
123

    
124
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DatasourceManagerException {
125
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
126
		dsDao.updateDatasourceName(dsId, officialname, englishname);
127

    
128
		//TODO update also to the IS
129
	}
130

    
131
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DatasourceManagerException {
132
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
133
		dsDao.updateCoordinates(dsId, latitude, longitude);
134

    
135
		//TODO update also to the IS
136
	}
137

    
138
	public void updateApiBaseurl(final String apiId, final String baseUrl) throws DatasourceManagerException {
139
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
140
		dsDao.updateBaseUrl(apiId, baseUrl);
141

    
142
		//TODO update also to the IS
143
	}
144

    
145
	public void updateTimezone(final String dsId, final String timezone) throws DatasourceManagerException {
146
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
147
		dsDao.updateTimezone(dsId, timezone);
148

    
149
		//TODO update also to the IS
150
	}
151

    
152
	public void updateDsTypology(final String dsId, final String typology) throws DatasourceManagerException {
153
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
154
		dsDao.updateTypology(dsId, typology);
155

    
156
		//TODO update also to the IS
157
	}
158

    
159
	public void updateApiCompatibility(final String apiId, final String compliance, final boolean override) throws DatasourceManagerException {
160
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
161
		dsDao.updateCompliance(null, apiId, compliance, override);
162

    
163
		//TODO update also to the IS
164
	}
165

    
166
	public void deleteDs(final String dsId) throws DatasourceManagerException {
167
		log.info(String.format("deleted datasource '%s'", dsId));
168
		dsDao.deleteDs(dsId);
169
	}
170

    
171
	public void deleteApi(final String apiId) throws DatasourceManagerException {
172
		dsDao.deleteApi(null, apiId);
173
	}
174

    
175
	public void dropCaches() {
176
		mongoLoggerClient.dropCache();
177
		isLookupClient.dropCache();
178
		vocabularyClient.dropCache();
179
	}
180

    
181
	// HELPERS //////////////
182

    
183
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
184
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
185
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
186
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
187
		return dsInfo;
188
	}
189

    
190
	private void getAggregationHistory(final String dsId,
191
			final CountDownLatch outerLatch,
192
			final Queue<Throwable> errors,
193
			final DatasourceInfo datasourceInfo) {
194
		Futures.addCallback(
195
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
196
				new FutureCallback<List<AggregationInfo>>() {
197

    
198
					public void onSuccess(List<AggregationInfo> info) {
199
						setAggregationHistory(datasourceInfo, info);
200
						outerLatch.countDown();
201
					}
202

    
203
					public void onFailure(Throwable e) {
204
						log.error(ExceptionUtils.getStackTrace(e));
205
						errors.offer(e);
206
						outerLatch.countDown();
207
					}
208
				}, executor);
209
	}
210

    
211
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
212
		datasourceInfo.setAggregationHistory(info);
213
		if (!info.isEmpty()) {
214
			datasourceInfo
215
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
216
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
217
		}
218
	}
219

    
220
	private void getIndexDsInfo(final String dsId,
221
			final CountDownLatch outerLatch,
222
			final Queue<Throwable> errors,
223
			final DatasourceInfo datasourceInfo) {
224
		Futures.addCallback(
225
				executor.submit(() -> isLookupClient.calculateCurrentIndexDsInfo()),
226
				new FutureCallback<IndexDsInfo>() {
227

    
228
					public void onSuccess(final IndexDsInfo info) {
229

    
230
						final CountDownLatch innerLatch = new CountDownLatch(2);
231

    
232
						Futures.addCallback(
233
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
234
								new FutureCallback<IndexRecordsInfo>() {
235

    
236
									public void onSuccess(IndexRecordsInfo info) {
237
										datasourceInfo
238
												.setIndexRecords(info.getTotal())
239
												.setFundedContent(info.getFunded())
240
												.setLastIndexingDate(info.getDate());
241
										innerLatch.countDown();
242
									}
243

    
244
									public void onFailure(Throwable e) {
245
										errors.offer(e);
246
										innerLatch.countDown();
247
									}
248
								}, executor);
249

    
250
						Futures.addCallback(
251
								executor.submit(() ->
252
										objectStoreClient.getObjectStoreSize(
253
												isLookupClient.getObjectStoreId(dsId, errors),
254
												errors)),
255
								new FutureCallback<Long>() {
256
									@Override
257
									public void onSuccess(final Long objectStoreSize) {
258
										datasourceInfo.setFulltexts(objectStoreSize);
259
										innerLatch.countDown();
260
									}
261

    
262
									@Override
263
									public void onFailure(final Throwable e) {
264
										errors.offer(e);
265
										innerLatch.countDown();
266
									}
267
								}, executor);
268

    
269
						waitLatch(innerLatch, errors, config.getRequestTimeout());
270

    
271
						outerLatch.countDown();
272
					}
273

    
274
					public void onFailure(final Throwable e) {
275
						//log.error(ExceptionUtils.getStackTrace(e));
276
						errors.offer(e);
277
						outerLatch.countDown();
278
					}
279
				}, executor);
280
	}
281

    
282
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
283
		try {
284
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
285
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
286
			}
287
		} catch (final InterruptedException e) {
288
			errors.offer(e);
289
		}
290
	}
291

    
292
}
(1-1/2)