Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource;
2

    
3
import java.util.List;
4
import java.util.Map;
5
import java.util.Queue;
6
import java.util.concurrent.CountDownLatch;
7
import java.util.concurrent.ScheduledThreadPoolExecutor;
8
import java.util.concurrent.TimeUnit;
9
import java.util.concurrent.TimeoutException;
10
import java.util.stream.Collectors;
11
import javax.annotation.PostConstruct;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import com.google.common.util.concurrent.*;
17
import com.google.common.xml.XmlEscapers;
18
import eu.dnetlib.OpenaireExporterConfig;
19
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
20
import eu.dnetlib.enabling.datasources.common.AggregationStage;
21
import eu.dnetlib.enabling.datasources.common.DsmException;
22
import eu.dnetlib.enabling.datasources.common.DsmForbiddenException;
23
import eu.dnetlib.openaire.exporter.datasource.clients.*;
24
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
25
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
26
import eu.dnetlib.openaire.exporter.model.ApiDetailsResponse;
27
import eu.dnetlib.openaire.exporter.model.ConversionUtils;
28
import eu.dnetlib.openaire.exporter.model.DatasourceResponse;
29
import eu.dnetlib.openaire.exporter.model.datasource.*;
30
import eu.dnetlib.openaire.exporter.model.datasource.db.ApiDbEntry;
31
import eu.dnetlib.openaire.exporter.model.datasource.db.DatasourceDbEntry;
32
import eu.dnetlib.openaire.exporter.vocabularies.Country;
33
import org.apache.commons.lang3.StringUtils;
34
import org.apache.commons.lang3.exception.ExceptionUtils;
35
import org.apache.commons.logging.Log;
36
import org.apache.commons.logging.LogFactory;
37
import org.springframework.beans.factory.annotation.Autowired;
38
import org.springframework.data.domain.Page;
39
import org.springframework.stereotype.Component;
40

    
41
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.apiResponse;
42
import static eu.dnetlib.openaire.exporter.datasource.clients.ResponseUtils.datasourceResponse;
43

    
44
@Component
45
public class DatasourceManagerCore {
46

    
47
	private static final Log log = LogFactory.getLog(DatasourceManagerCore.class);
48

    
49
	@Autowired
50
	private MongoLoggerClient mongoLoggerClient;
51

    
52
	@Autowired
53
	private ISClient isClient;
54

    
55
	@Autowired
56
	private ObjectStoreClient objectStoreClient;
57

    
58
	@Autowired
59
	private DatasourceIndexClient datasourceIndexClient;
60

    
61
	@Autowired
62
	private VocabularyClient vocabularyClient;
63

    
64
	@Autowired
65
	private DatasourceDao dsDao;
66

    
67
	@Autowired
68
	private OpenaireExporterConfig config;
69

    
70
	private ListeningExecutorService executor;
71

    
72
	@PostConstruct
73
	public void init() {
74
		executor = MoreExecutors.listeningDecorator(
75
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
76
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
77
	}
78

    
79
	public List<Country> listCountries() throws DsmException {
80
		return dsDao.listCountries();
81
	}
82

    
83
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
84
			throws DsmException {
85
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
86
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
87
		final CountDownLatch outerLatch = new CountDownLatch(2);
88

    
89
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
90
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
91
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(ConversionUtils.asDetails(d), outerLatch, errors)));
92
			waitLatch(outerLatch, errors, config.getRequestTimeout());
93
		}
94

    
95
		return datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
96
	}
97

    
98
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
99
		return dsDao.findApiBaseURLs(requestFilter, page, size);
100
	}
101

    
102
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
103
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
104
		final List<ApiDetails> api = apis.stream()
105
				.map(ConversionUtils::asDetails)
106
				.collect(Collectors.toList());
107
		return apiResponse(api, api.size());
108
	}
109

    
110
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
111
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
112
		dsDao.setManaged(dsId, managed);
113
	}
114

    
115
	public boolean isManaged(final String dsId) throws DsmException {
116
		return dsDao.isManaged(dsId);
117
	}
118

    
119
	public boolean exist(final DatasourceDetails d) throws DsmException {
120
		return ((DatasourceDaoImpl) dsDao).existDs(d.getId());
121
	}
122

    
123
	public void save(final DatasourceDetails d) throws DsmException {
124
		dsDao.saveDs(ConversionUtils.asDbEntry(d));
125
		isClient.registerDS(d);
126
	}
127

    
128
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
129
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
130
		dsDao.updateName(dsId, officialname, englishname);
131

    
132
		final Map<String, String> changes = Maps.newHashMap();
133
		changes.put("//CONFIGURATION/OFFICIAL_NAME", XmlEscapers.xmlContentEscaper().escape(officialname));
134
		changes.put("//CONFIGURATION/ENGLISH_NAME", XmlEscapers.xmlContentEscaper().escape(englishname));
135
		isClient.updateDatasourceFields(dsId, changes);
136
	}
137

    
138
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
139
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
140

    
141
		dsDao.updateLogoUrl(dsId, logourl);
142
	}
143

    
144
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
145
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
146
		dsDao.updateCoordinates(dsId, latitude, longitude);
147

    
148
		final Map<String, String> changes = Maps.newHashMap();
149
		changes.put("//CONFIGURATION/LOCATION/LATITUDE", XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
150
		changes.put("//CONFIGURATION/LOCATION/LONGITUDE", XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
151
		isClient.updateDatasourceFields(dsId, changes);
152
	}
153

    
154
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
155
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
156
		dsDao.updateTimezone(dsId, timezone);
157

    
158
		final Map<String, String> changes = Maps.newHashMap();
159
		changes.put("//CONFIGURATION/LOCATION/TIMEZONE", XmlEscapers.xmlContentEscaper().escape(timezone));
160
		isClient.updateDatasourceFields(dsId, changes);
161
	}
162

    
163
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
164
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
165
		dsDao.updateTypology(dsId, typology);
166

    
167
		final Map<String, String> changes = Maps.newHashMap();
168
		changes.put("//CONFIGURATION/DATASOURCE_TYPE", XmlEscapers.xmlContentEscaper().escape(typology));
169
		isClient.updateDatasourceFields(dsId, changes);
170
	}
171

    
172
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
173
		log.info(String.format("updated datasource '%s' registering user with '%s'", dsId, registeredBy));
174
		dsDao.updateRegisteringUser(dsId, registeredBy);
175
	}
176

    
177
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
178
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
179
		dsDao.updatePlatform(dsId, platform);
180

    
181
		final Map<String, String> changes = Maps.newHashMap();
182
		changes.put("//CONFIGURATION/TYPOLOGY", XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the TYPOLOGY field.
183
		isClient.updateDatasourceFields(dsId, changes);
184
	}
185

    
186
	//TODO remove if unused
187
	public void deleteDs(final String dsId) throws DsmException {
188
		log.info(String.format("deleted datasource '%s'", dsId));
189
		dsDao.deleteDs(dsId);
190
	}
191

    
192
	// API
193

    
194
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
195
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
196
		dsDao.updateApiBaseUrl(apiId, baseUrl);
197

    
198
		final Map<String, String> changes = Maps.newHashMap();
199
		changes.put("/BASE_URL", XmlEscapers.xmlContentEscaper().escape(baseUrl));
200

    
201
		isClient.updateAPIField(dsId, apiId, changes);
202
	}
203

    
204
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
205
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
206
		dsDao.updateCompliance(null, apiId, compliance, override);
207

    
208
		final Map<String, String> changes = Maps.newHashMap();
209
		changes.put("/@compliance", XmlEscapers.xmlAttributeEscaper().escape(compliance));
210

    
211
		isClient.updateAPIField(dsId, apiId, changes);
212
	}
213

    
214
	public void addApi(final ApiDetails api) throws DsmException {
215
		if (StringUtils.isBlank(api.getId())) {
216
			api.setId(ConversionUtils.createId(api));
217
			log.info(String.format("missing api id, created '%s'", api.getId()));
218
		}
219

    
220
		dsDao.addApi(ConversionUtils.asDbEntry(api));
221
		isClient.registerAPI(api);
222
	}
223

    
224
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
225
		//TODO handle the api removal in case of associated workflows.
226
		isClient.removeAPI(apiId);
227
		dsDao.deleteApi(null, apiId);
228
	}
229

    
230
	public void dropCaches() {
231
		mongoLoggerClient.dropCache();
232
		isClient.dropCache();
233
		vocabularyClient.dropCache();
234
	}
235

    
236
	// HELPERS //////////////
237

    
238
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
239
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
240
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
241
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
242
		return dsInfo;
243
	}
244

    
245
	private void getAggregationHistory(final String dsId,
246
			final CountDownLatch outerLatch,
247
			final Queue<Throwable> errors,
248
			final DatasourceInfo datasourceInfo) {
249
		Futures.addCallback(
250
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
251
				new FutureCallback<List<AggregationInfo>>() {
252

    
253
					public void onSuccess(List<AggregationInfo> info) {
254
						setAggregationHistory(datasourceInfo, info);
255
						outerLatch.countDown();
256
					}
257

    
258
					public void onFailure(Throwable e) {
259
						log.error(ExceptionUtils.getStackTrace(e));
260
						errors.offer(e);
261
						outerLatch.countDown();
262
					}
263
				}, executor);
264
	}
265

    
266
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
267
		datasourceInfo.setAggregationHistory(info);
268
		if (!info.isEmpty()) {
269
			datasourceInfo
270
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
271
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
272
		}
273
	}
274

    
275
	private void getIndexDsInfo(final String dsId,
276
			final CountDownLatch outerLatch,
277
			final Queue<Throwable> errors,
278
			final DatasourceInfo datasourceInfo) {
279
		Futures.addCallback(
280
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
281
				new FutureCallback<IndexDsInfo>() {
282

    
283
					public void onSuccess(final IndexDsInfo info) {
284

    
285
						final CountDownLatch innerLatch = new CountDownLatch(2);
286

    
287
						Futures.addCallback(
288
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
289
								new FutureCallback<IndexRecordsInfo>() {
290

    
291
									public void onSuccess(IndexRecordsInfo info) {
292
										datasourceInfo
293
												.setIndexRecords(info.getTotal())
294
												.setFundedContent(info.getFunded())
295
												.setLastIndexingDate(info.getDate());
296
										innerLatch.countDown();
297
									}
298

    
299
									public void onFailure(Throwable e) {
300
										errors.offer(e);
301
										innerLatch.countDown();
302
									}
303
								}, executor);
304

    
305
						Futures.addCallback(
306
								executor.submit(() ->
307
										objectStoreClient.getObjectStoreSize(
308
												isClient.getObjectStoreId(dsId, errors),
309
												errors)),
310
								new FutureCallback<Long>() {
311
									@Override
312
									public void onSuccess(final Long objectStoreSize) {
313
										datasourceInfo.setFulltexts(objectStoreSize);
314
										innerLatch.countDown();
315
									}
316

    
317
									@Override
318
									public void onFailure(final Throwable e) {
319
										errors.offer(e);
320
										innerLatch.countDown();
321
									}
322
								}, executor);
323

    
324
						waitLatch(innerLatch, errors, config.getRequestTimeout());
325

    
326
						outerLatch.countDown();
327
					}
328

    
329
					public void onFailure(final Throwable e) {
330
						//log.error(ExceptionUtils.getStackTrace(e));
331
						errors.offer(e);
332
						outerLatch.countDown();
333
					}
334
				}, executor);
335
	}
336

    
337
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
338
		try {
339
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
340
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
341
			}
342
		} catch (final InterruptedException e) {
343
			errors.offer(e);
344
		}
345
	}
346

    
347
}
(1-1/2)