Project

General

Profile

1
package eu.dnetlib.openaire.dsm;
2

    
3
import java.util.*;
4
import java.util.concurrent.CountDownLatch;
5
import java.util.concurrent.ScheduledThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
import java.util.concurrent.TimeoutException;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10
import java.util.stream.Stream;
11
import javax.annotation.PostConstruct;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import com.google.common.util.concurrent.*;
17
import com.google.common.xml.XmlEscapers;
18
import eu.dnetlib.OpenaireExporterConfig;
19
import eu.dnetlib.enabling.datasources.common.*;
20
import eu.dnetlib.openaire.common.ISClient;
21
import eu.dnetlib.openaire.community.CommunityClient;
22
import eu.dnetlib.openaire.dsm.dao.*;
23
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
24
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
25
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
26
import eu.dnetlib.openaire.dsm.domain.*;
27
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
28
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
29
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
30
import eu.dnetlib.openaire.vocabularies.Country;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.lang3.exception.ExceptionUtils;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
37
import org.springframework.data.domain.Page;
38
import org.springframework.stereotype.Component;
39

    
40
import static eu.dnetlib.openaire.common.ExporterConstants.*;
41
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.*;
42

    
43
@Component
44
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
45
public class DsmCore {
46

    
47
	private static final Log log = LogFactory.getLog(DsmCore.class);
48

    
49
	@Autowired
50
	private MongoLoggerClient mongoLoggerClient;
51

    
52
	@Autowired
53
	private ISClient isClient;
54

    
55
	@Autowired
56
	private ObjectStoreClient objectStoreClient;
57

    
58
	@Autowired
59
	private DatasourceIndexClient datasourceIndexClient;
60

    
61
	@Autowired
62
	private VocabularyClient vocabularyClient;
63

    
64
	@Autowired
65
	private DatasourceDao dsDao;
66

    
67
	@Autowired
68
	private OpenaireExporterConfig config;
69

    
70
	@Autowired
71
	private CommunityClient communityClient;
72

    
73
	private ListeningExecutorService executor;
74

    
75
	@PostConstruct
76
	public void init() {
77
		executor = MoreExecutors.listeningDecorator(
78
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
79
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
80
	}
81

    
82
	public List<Country> listCountries() throws DsmException {
83
		try {
84
			return dsDao.listCountries();
85
		} catch (Throwable e) {
86
			log.error("error listing countries", e);
87
			throw e;
88
		}
89
	}
90

    
91
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
92
			throws DsmException {
93

    
94
		try {
95
			final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
96
			final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
97
			final CountDownLatch outerLatch = new CountDownLatch(2);
98

    
99
			final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
100
			if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
101
				dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(asDetails(d), outerLatch, errors)));
102
				waitLatch(outerLatch, errors, config.getRequestTimeout());
103
			}
104

    
105
			if (!errors.isEmpty()) {
106
				//TODO report on error metrics
107
				errors.forEach(e -> log.error(e));
108
			}
109
			return ResponseUtils.datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
110
		} catch (Throwable e) {
111
			log.error("error searching datasources", e);
112
			throw e;
113
		}
114
	}
115

    
116
	public DatasourceResponse searchSnippet(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
117
			throws DsmException {
118
		try {
119
			final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
120
			return ResponseUtils.datasourceResponse(dsPage.map(d -> asSnippet(d)).getContent(), Queues.newLinkedBlockingQueue(), dsPage.getTotalElements());
121
		} catch (Throwable e) {
122
			log.error("error searching datasources", e);
123
			throw e;
124
		}
125
	}
126

    
127
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
128
		try {
129
			return dsDao.findApiBaseURLs(requestFilter, page, size);
130
		} catch (Throwable e) {
131
			log.error("error searching datasource base urls", e);
132
			throw e;
133
		}
134
	}
135

    
136
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
137
		try {
138
			final List<ApiDbEntry> apis = dsDao.getApis(dsId);
139
			final List<ApiDetails> api = apis.stream()
140
					.map(DsmMappingUtils::asDetails)
141
					.collect(Collectors.toList());
142
			return ResponseUtils.apiResponse(api, api.size());
143
		} catch (Throwable e) {
144
			log.error(String.format("error searching datasource api %s", dsId), e);
145
			throw e;
146
		}
147
	}
148

    
149
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
150
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
151
		dsDao.setManaged(dsId, managed);
152
	}
153

    
154
	public boolean isManaged(final String dsId) throws DsmException {
155
		return dsDao.isManaged(dsId);
156
	}
157

    
158
	public boolean exist(final DatasourceDetails d) throws DsmException {
159
		return dsDao.existDs(d.getId());
160
	}
161

    
162
	public void save(final DatasourceDetails d) throws DsmException {
163
		try {
164
			dsDao.saveDs(asDbEntry(d));
165
			isClient.registerDS(d);
166
		} catch(Throwable e) {
167
			log.error(ExceptionUtils.getStackTrace(e));
168
			throw e;
169
		}
170
	}
171

    
172
	public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
173
		try {
174
			//initialize with current values from DB
175
			final Datasource ds = dsDao.getDs(d.getId());
176
			final DatasourceDbEntry dbEntry = (DatasourceDbEntry) ds;
177

    
178
			if (dbEntry == null) {
179
				throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId()));
180
			}
181

    
182
			final DatasourceDbEntry update = asDbEntry(d);
183
			if (d.getIdentities() != null) {
184
				final Set<IdentityDbEntry> identities = (
185
						new HashSet<>(
186
								Stream.of(
187
										update.getIdentities(),
188
										dbEntry.getIdentities())
189
										.flatMap(Collection::stream)
190
										.collect(
191
												Collectors.toMap(
192
														i -> i.getIssuertype() + i.getPid(),
193
														Function.identity(),
194
														(i1, i2) -> i1))
195
										.values()));
196
				copyNonNullProperties(update, dbEntry);
197
				dbEntry.setIdentities(identities);
198
			} else {
199
				copyNonNullProperties(update, dbEntry);
200
			}
201

    
202
			dsDao.saveDs(dbEntry);
203
			isClient.updateDatasourceFields(d.getId(), asMapOfChanges(d));
204
		} catch (Throwable e) {
205
			log.error(ExceptionUtils.getStackTrace(e));
206
			throw e;
207
		}
208
	}
209

    
210
	@Deprecated
211
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
212
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
213
		dsDao.updateName(dsId, officialname, englishname);
214

    
215
		final Map<String, String> changes = Maps.newHashMap();
216
		changes.put(OFFICIAL_NAME, XmlEscapers.xmlContentEscaper().escape(officialname));
217
		changes.put(ENGLISH_NAME, XmlEscapers.xmlContentEscaper().escape(englishname));
218
		isClient.updateDatasourceFields(dsId, changes);
219
	}
220

    
221
	@Deprecated
222
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
223
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
224

    
225
		dsDao.updateLogoUrl(dsId, logourl);
226
	}
227

    
228
	@Deprecated
229
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
230
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
231
		dsDao.updateCoordinates(dsId, latitude, longitude);
232

    
233
		final Map<String, String> changes = Maps.newHashMap();
234
		changes.put(LATITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
235
		changes.put(LONGITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
236
		isClient.updateDatasourceFields(dsId, changes);
237
	}
238

    
239
	@Deprecated
240
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
241
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
242
		dsDao.updateTimezone(dsId, timezone);
243

    
244
		final Map<String, String> changes = Maps.newHashMap();
245
		changes.put(TIMEZONE, XmlEscapers.xmlContentEscaper().escape(timezone));
246
		isClient.updateDatasourceFields(dsId, changes);
247
	}
248

    
249
	@Deprecated
250
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
251
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
252
		dsDao.updateTypology(dsId, typology);
253

    
254
		final Map<String, String> changes = Maps.newHashMap();
255
		changes.put(TYPOLOGY, XmlEscapers.xmlContentEscaper().escape(typology));
256
		isClient.updateDatasourceFields(dsId, changes);
257
	}
258

    
259
	@Deprecated
260
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
261
		log.info(String.format("setting datasource '%s' registering user with '%s'", dsId, registeredBy));
262
		dsDao.updateRegisteringUser(dsId, registeredBy);
263
	}
264

    
265
	@Deprecated
266
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
267
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
268
		dsDao.updatePlatform(dsId, platform);
269

    
270
		final Map<String, String> changes = Maps.newHashMap();
271
		changes.put(PLATFORM, XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the DATASOURCE_TYPE field.
272
		isClient.updateDatasourceFields(dsId, changes);
273
	}
274

    
275
	//TODO remove if unused
276
	public void deleteDs(final String dsId) throws DsmException {
277
		log.info(String.format("deleted datasource '%s'", dsId));
278
		dsDao.deleteDs(dsId);
279
	}
280

    
281
	// API
282

    
283
	public void updateApiOaiSet(final String dsId, final String apiId, final String oaiSet) throws DsmException {
284
		boolean insert = dsDao.upsertApiOaiSet(apiId, oaiSet);
285
		final Map<String, String> changes = Maps.newHashMap();
286
		changes.put(OAI_SET, XmlEscapers.xmlContentEscaper().escape(oaiSet));
287

    
288
		if (!insert) {
289
			isClient.updateAPIField(dsId, apiId, changes);
290
		} else {
291
			isClient.addAPIAttribute(dsId, apiId, changes);
292
		}
293
	}
294

    
295
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
296
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
297
		dsDao.updateApiBaseUrl(apiId, baseUrl);
298

    
299
		final Map<String, String> changes = Maps.newHashMap();
300
		changes.put(BASE_URL, XmlEscapers.xmlContentEscaper().escape(baseUrl));
301

    
302
		isClient.updateAPIField(dsId, apiId, changes);
303
	}
304

    
305
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
306
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
307
		dsDao.updateCompliance(null, apiId, compliance, override);
308

    
309
		final Map<String, String> changes = Maps.newHashMap();
310
		changes.put(COMPLIANCE, XmlEscapers.xmlAttributeEscaper().escape(compliance));
311

    
312
		isClient.updateAPIField(dsId, apiId, changes);
313
	}
314

    
315
	public void addApi(final ApiDetails api) throws DsmException {
316
		if (StringUtils.isBlank(api.getId())) {
317
			api.setId(createId(api));
318
			log.info(String.format("missing api id, created '%s'", api.getId()));
319
		}
320

    
321
		dsDao.addApi(asDbEntry(api));
322
		isClient.registerAPI(api);
323
	}
324

    
325
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
326
		//TODO handle the api removal in case of associated workflows.
327
		isClient.removeAPI(apiId);
328
		dsDao.deleteApi(null, apiId);
329
	}
330

    
331
	public void dropCaches() {
332
		mongoLoggerClient.dropCache();
333
		isClient.dropCache();
334
		vocabularyClient.dropCache();
335
		communityClient.dropCache();
336
	}
337

    
338
	// HELPERS //////////////
339

    
340
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
341
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
342
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
343
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
344
		return dsInfo;
345
	}
346

    
347
	private void getAggregationHistory(final String dsId,
348
			final CountDownLatch outerLatch,
349
			final Queue<Throwable> errors,
350
			final DatasourceInfo datasourceInfo) {
351
		Futures.addCallback(
352
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
353
				new FutureCallback<List<AggregationInfo>>() {
354

    
355
					public void onSuccess(List<AggregationInfo> info) {
356
						setAggregationHistory(datasourceInfo, info);
357
						outerLatch.countDown();
358
					}
359

    
360
					public void onFailure(Throwable e) {
361
						log.error(ExceptionUtils.getStackTrace(e));
362
						errors.offer(e);
363
						outerLatch.countDown();
364
					}
365
				}, executor);
366
	}
367

    
368
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
369
		datasourceInfo.setAggregationHistory(info);
370
		if (!info.isEmpty()) {
371
			datasourceInfo
372
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
373
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
374
		}
375
	}
376

    
377
	private void getIndexDsInfo(final String dsId,
378
			final CountDownLatch outerLatch,
379
			final Queue<Throwable> errors,
380
			final DatasourceInfo datasourceInfo) {
381
		Futures.addCallback(
382
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
383
				new FutureCallback<IndexDsInfo>() {
384

    
385
					public void onSuccess(final IndexDsInfo info) {
386

    
387
						final CountDownLatch innerLatch = new CountDownLatch(2);
388

    
389
						Futures.addCallback(
390
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
391
								new FutureCallback<IndexRecordsInfo>() {
392

    
393
									public void onSuccess(IndexRecordsInfo info) {
394
										datasourceInfo
395
												.setIndexRecords(info.getTotal())
396
												.setFundedContent(info.getFunded())
397
												.setLastIndexingDate(info.getDate());
398
										innerLatch.countDown();
399
									}
400

    
401
									public void onFailure(Throwable e) {
402
										errors.offer(e);
403
										innerLatch.countDown();
404
									}
405
								}, executor);
406

    
407
						Futures.addCallback(
408
								executor.submit(() ->
409
										objectStoreClient.getObjectStoreSize(isClient.getObjectStoreId(dsId))),
410
								new FutureCallback<Long>() {
411
									@Override
412
									public void onSuccess(final Long objectStoreSize) {
413
										datasourceInfo.setFulltexts(objectStoreSize);
414
										innerLatch.countDown();
415
									}
416

    
417
									@Override
418
									public void onFailure(final Throwable e) {
419
										errors.offer(e);
420
										innerLatch.countDown();
421
									}
422
								}, executor);
423

    
424
						waitLatch(innerLatch, errors, config.getRequestTimeout());
425

    
426
						outerLatch.countDown();
427
					}
428

    
429
					public void onFailure(final Throwable e) {
430
						//log.error(ExceptionUtils.getStackTrace(e));
431
						errors.offer(e);
432
						outerLatch.countDown();
433
					}
434
				}, executor);
435
	}
436

    
437
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
438
		try {
439
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
440
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
441
			}
442
		} catch (final InterruptedException e) {
443
			errors.offer(e);
444
		}
445
	}
446

    
447
}
(2-2/2)