Project

General

Profile

1
package eu.dnetlib.openaire.dsm;
2

    
3
import java.util.*;
4
import java.util.concurrent.CountDownLatch;
5
import java.util.concurrent.ScheduledThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
import java.util.concurrent.TimeoutException;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10
import java.util.stream.Stream;
11
import javax.annotation.Nullable;
12
import javax.annotation.PostConstruct;
13

    
14
import com.google.common.collect.Lists;
15
import com.google.common.collect.Maps;
16
import com.google.common.collect.Queues;
17
import com.google.common.util.concurrent.*;
18
import com.google.common.xml.XmlEscapers;
19
import eu.dnetlib.OpenaireExporterConfig;
20
import eu.dnetlib.enabling.datasources.common.*;
21
import eu.dnetlib.openaire.common.ISClient;
22
import eu.dnetlib.openaire.dsm.dao.*;
23
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
24
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
25
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
26
import eu.dnetlib.openaire.dsm.domain.*;
27
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
28
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
29
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
30
import eu.dnetlib.openaire.vocabularies.Country;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.lang3.exception.ExceptionUtils;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
37
import org.springframework.data.domain.Page;
38
import org.springframework.stereotype.Component;
39

    
40
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.*;
41
import static eu.dnetlib.openaire.common.ExporterConstants.*;
42

    
43
@Component
44
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
45
public class DsmCore {
46

    
47
	private static final Log log = LogFactory.getLog(DsmCore.class);
48

    
49
	@Autowired
50
	private MongoLoggerClient mongoLoggerClient;
51

    
52
	@Autowired
53
	private ISClient isClient;
54

    
55
	@Autowired
56
	private ObjectStoreClient objectStoreClient;
57

    
58
	@Autowired
59
	private DatasourceIndexClient datasourceIndexClient;
60

    
61
	@Autowired
62
	private VocabularyClient vocabularyClient;
63

    
64
	@Autowired
65
	private DatasourceDao dsDao;
66

    
67
	@Autowired
68
	private OpenaireExporterConfig config;
69

    
70
	private ListeningExecutorService executor;
71

    
72
	@PostConstruct
73
	public void init() {
74
		executor = MoreExecutors.listeningDecorator(
75
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
76
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
77
	}
78

    
79
	public List<Country> listCountries() throws DsmException {
80
		try {
81
			return dsDao.listCountries();
82
		} catch (Throwable e) {
83
			log.error("error listing countries", e);
84
			throw e;
85
		}
86
	}
87

    
88
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
89
			throws DsmException {
90

    
91
		try {
92
			final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
93
			final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
94
			final CountDownLatch outerLatch = new CountDownLatch(2);
95

    
96
			final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
97
			if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
98
				dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(asDetails(d), outerLatch, errors)));
99
				waitLatch(outerLatch, errors, config.getRequestTimeout());
100
			}
101

    
102
			if (!errors.isEmpty()) {
103
				//TODO report on error metrics
104
				errors.forEach(e -> log.error(e));
105
			}
106
			return ResponseUtils.datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
107
		} catch (Throwable e) {
108
			log.error("error searching datasources", e);
109
			throw e;
110
		}
111
	}
112

    
113
	public DatasourceResponse searchSnippet(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
114
			throws DsmException {
115
		try {
116
			final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
117
			return ResponseUtils.datasourceResponse(dsPage.map(d -> asSnippet(d)).getContent(), Queues.newLinkedBlockingQueue(), dsPage.getTotalElements());
118
		} catch (Throwable e) {
119
			log.error("error searching datasources", e);
120
			throw e;
121
		}
122
	}
123

    
124
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
125
		try {
126
			return dsDao.findApiBaseURLs(requestFilter, page, size);
127
		} catch (Throwable e) {
128
			log.error("error searching datasource base urls", e);
129
			throw e;
130
		}
131
	}
132

    
133
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
134
		try {
135
			final List<ApiDbEntry> apis = dsDao.getApis(dsId);
136
			final List<ApiDetails> api = apis.stream()
137
					.map(DsmMappingUtils::asDetails)
138
					.collect(Collectors.toList());
139
			return ResponseUtils.apiResponse(api, api.size());
140
		} catch (Throwable e) {
141
			log.error(String.format("error searching datasource api %s", dsId), e);
142
			throw e;
143
		}
144
	}
145

    
146
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
147
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
148
		dsDao.setManaged(dsId, managed);
149
	}
150

    
151
	public boolean isManaged(final String dsId) throws DsmException {
152
		return dsDao.isManaged(dsId);
153
	}
154

    
155
	public boolean exist(final DatasourceDetails d) throws DsmException {
156
		return dsDao.existDs(d.getId());
157
	}
158

    
159
	public void save(final DatasourceDetails d) throws DsmException {
160
		try {
161
			dsDao.saveDs(asDbEntry(d));
162
			isClient.registerDS(d);
163
		} catch(Throwable e) {
164
			log.error(ExceptionUtils.getStackTrace(e));
165
			throw e;
166
		}
167
	}
168

    
169
	public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
170
		try {
171
			final Datasource ds = dsDao.getDs(d.getId());
172
			final DatasourceDbEntry dbEntry = (DatasourceDbEntry) ds;
173

    
174
			if (dbEntry == null) {
175
				throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId()));
176
			}
177

    
178
			final DatasourceDbEntry update = asDbEntry(d);
179
			if (d.getIdentities() != null) {
180
				final Set<IdentityDbEntry> identities = (
181
						new HashSet<>(
182
								Stream.of(
183
										update.getIdentities(),
184
										dbEntry.getIdentities())
185
										.flatMap(Collection::stream)
186
										.collect(
187
												Collectors.toMap(
188
														i -> i.getIssuertype() + i.getPid(),
189
														Function.identity(),
190
														(i1, i2) -> i1))
191
										.values()));
192
				copyNonNullProperties(update, dbEntry);
193
				dbEntry.setIdentities(identities);
194
			} else {
195
				copyNonNullProperties(update, dbEntry);
196
			}
197

    
198
			dsDao.saveDs(dbEntry);
199
			isClient.updateDatasourceFields(d.getId(), asMapOfChanges(d));
200
		} catch (Throwable e) {
201
			log.error(ExceptionUtils.getStackTrace(e));
202
			throw e;
203
		}
204
	}
205

    
206
	@Deprecated
207
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
208
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
209
		dsDao.updateName(dsId, officialname, englishname);
210

    
211
		final Map<String, String> changes = Maps.newHashMap();
212
		changes.put(OFFICIAL_NAME, XmlEscapers.xmlContentEscaper().escape(officialname));
213
		changes.put(ENGLISH_NAME, XmlEscapers.xmlContentEscaper().escape(englishname));
214
		isClient.updateDatasourceFields(dsId, changes);
215
	}
216

    
217
	@Deprecated
218
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
219
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
220

    
221
		dsDao.updateLogoUrl(dsId, logourl);
222
	}
223

    
224
	@Deprecated
225
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
226
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
227
		dsDao.updateCoordinates(dsId, latitude, longitude);
228

    
229
		final Map<String, String> changes = Maps.newHashMap();
230
		changes.put(LATITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
231
		changes.put(LONGITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
232
		isClient.updateDatasourceFields(dsId, changes);
233
	}
234

    
235
	@Deprecated
236
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
237
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
238
		dsDao.updateTimezone(dsId, timezone);
239

    
240
		final Map<String, String> changes = Maps.newHashMap();
241
		changes.put(TIMEZONE, XmlEscapers.xmlContentEscaper().escape(timezone));
242
		isClient.updateDatasourceFields(dsId, changes);
243
	}
244

    
245
	@Deprecated
246
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
247
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
248
		dsDao.updateTypology(dsId, typology);
249

    
250
		final Map<String, String> changes = Maps.newHashMap();
251
		changes.put(TYPOLOGY, XmlEscapers.xmlContentEscaper().escape(typology));
252
		isClient.updateDatasourceFields(dsId, changes);
253
	}
254

    
255
	@Deprecated
256
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
257
		log.info(String.format("setting datasource '%s' registering user with '%s'", dsId, registeredBy));
258
		dsDao.updateRegisteringUser(dsId, registeredBy);
259
	}
260

    
261
	@Deprecated
262
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
263
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
264
		dsDao.updatePlatform(dsId, platform);
265

    
266
		final Map<String, String> changes = Maps.newHashMap();
267
		changes.put(PLATFORM, XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the DATASOURCE_TYPE field.
268
		isClient.updateDatasourceFields(dsId, changes);
269
	}
270

    
271
	//TODO remove if unused
272
	public void deleteDs(final String dsId) throws DsmException {
273
		log.info(String.format("deleted datasource '%s'", dsId));
274
		dsDao.deleteDs(dsId);
275
	}
276

    
277
	// API
278

    
279
	public void updateApiOaiSet(final String dsId, final String apiId, final String oaiSet) throws DsmException {
280
		boolean insert = dsDao.upsertApiOaiSet(apiId, oaiSet);
281
		final Map<String, String> changes = Maps.newHashMap();
282
		changes.put(OAI_SET, XmlEscapers.xmlContentEscaper().escape(oaiSet));
283

    
284
		if (!insert) {
285
			isClient.updateAPIField(dsId, apiId, changes);
286
		} else {
287
			isClient.addAPIAttribute(dsId, apiId, changes);
288
		}
289
	}
290

    
291
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
292
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
293
		dsDao.updateApiBaseUrl(apiId, baseUrl);
294

    
295
		final Map<String, String> changes = Maps.newHashMap();
296
		changes.put(BASE_URL, XmlEscapers.xmlContentEscaper().escape(baseUrl));
297

    
298
		isClient.updateAPIField(dsId, apiId, changes);
299
	}
300

    
301
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
302
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
303
		dsDao.updateCompliance(null, apiId, compliance, override);
304

    
305
		final Map<String, String> changes = Maps.newHashMap();
306
		changes.put(COMPLIANCE, XmlEscapers.xmlAttributeEscaper().escape(compliance));
307

    
308
		isClient.updateAPIField(dsId, apiId, changes);
309
	}
310

    
311
	public void addApi(final ApiDetails api) throws DsmException {
312
		if (StringUtils.isBlank(api.getId())) {
313
			api.setId(createId(api));
314
			log.info(String.format("missing api id, created '%s'", api.getId()));
315
		}
316

    
317
		dsDao.addApi(asDbEntry(api));
318
		isClient.registerAPI(api);
319
	}
320

    
321
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
322
		//TODO handle the api removal in case of associated workflows.
323
		isClient.removeAPI(apiId);
324
		dsDao.deleteApi(null, apiId);
325
	}
326

    
327
	public void dropCaches() {
328
		mongoLoggerClient.dropCache();
329
		isClient.dropCache();
330
		vocabularyClient.dropCache();
331
	}
332

    
333
	// HELPERS //////////////
334

    
335
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
336
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
337
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
338
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
339
		return dsInfo;
340
	}
341

    
342
	private void getAggregationHistory(final String dsId,
343
			final CountDownLatch outerLatch,
344
			final Queue<Throwable> errors,
345
			final DatasourceInfo datasourceInfo) {
346
		Futures.addCallback(
347
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
348
				new FutureCallback<List<AggregationInfo>>() {
349

    
350
					public void onSuccess(List<AggregationInfo> info) {
351
						setAggregationHistory(datasourceInfo, info);
352
						outerLatch.countDown();
353
					}
354

    
355
					public void onFailure(Throwable e) {
356
						log.error(ExceptionUtils.getStackTrace(e));
357
						errors.offer(e);
358
						outerLatch.countDown();
359
					}
360
				}, executor);
361
	}
362

    
363
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
364
		datasourceInfo.setAggregationHistory(info);
365
		if (!info.isEmpty()) {
366
			datasourceInfo
367
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
368
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
369
		}
370
	}
371

    
372
	private void getIndexDsInfo(final String dsId,
373
			final CountDownLatch outerLatch,
374
			final Queue<Throwable> errors,
375
			final DatasourceInfo datasourceInfo) {
376
		Futures.addCallback(
377
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
378
				new FutureCallback<IndexDsInfo>() {
379

    
380
					public void onSuccess(final IndexDsInfo info) {
381

    
382
						final CountDownLatch innerLatch = new CountDownLatch(2);
383

    
384
						Futures.addCallback(
385
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
386
								new FutureCallback<IndexRecordsInfo>() {
387

    
388
									public void onSuccess(IndexRecordsInfo info) {
389
										datasourceInfo
390
												.setIndexRecords(info.getTotal())
391
												.setFundedContent(info.getFunded())
392
												.setLastIndexingDate(info.getDate());
393
										innerLatch.countDown();
394
									}
395

    
396
									public void onFailure(Throwable e) {
397
										errors.offer(e);
398
										innerLatch.countDown();
399
									}
400
								}, executor);
401

    
402
						Futures.addCallback(
403
								executor.submit(() ->
404
										objectStoreClient.getObjectStoreSize(isClient.getObjectStoreId(dsId))),
405
								new FutureCallback<Long>() {
406
									@Override
407
									public void onSuccess(final Long objectStoreSize) {
408
										datasourceInfo.setFulltexts(objectStoreSize);
409
										innerLatch.countDown();
410
									}
411

    
412
									@Override
413
									public void onFailure(final Throwable e) {
414
										errors.offer(e);
415
										innerLatch.countDown();
416
									}
417
								}, executor);
418

    
419
						waitLatch(innerLatch, errors, config.getRequestTimeout());
420

    
421
						outerLatch.countDown();
422
					}
423

    
424
					public void onFailure(final Throwable e) {
425
						//log.error(ExceptionUtils.getStackTrace(e));
426
						errors.offer(e);
427
						outerLatch.countDown();
428
					}
429
				}, executor);
430
	}
431

    
432
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
433
		try {
434
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
435
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
436
			}
437
		} catch (final InterruptedException e) {
438
			errors.offer(e);
439
		}
440
	}
441

    
442
}
(2-2/2)