Project

General

Profile

1
package eu.dnetlib.openaire.dsm;
2

    
3
import java.util.*;
4
import java.util.concurrent.CountDownLatch;
5
import java.util.concurrent.ScheduledThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
import java.util.concurrent.TimeoutException;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10
import java.util.stream.Stream;
11
import javax.annotation.Nullable;
12
import javax.annotation.PostConstruct;
13

    
14
import com.google.common.collect.Lists;
15
import com.google.common.collect.Maps;
16
import com.google.common.collect.Queues;
17
import com.google.common.util.concurrent.*;
18
import com.google.common.xml.XmlEscapers;
19
import eu.dnetlib.OpenaireExporterConfig;
20
import eu.dnetlib.enabling.datasources.common.*;
21
import eu.dnetlib.openaire.common.ISClient;
22
import eu.dnetlib.openaire.dsm.dao.*;
23
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
24
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
25
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
26
import eu.dnetlib.openaire.dsm.domain.*;
27
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
28
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
29
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
30
import eu.dnetlib.openaire.vocabularies.Country;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.lang3.exception.ExceptionUtils;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
37
import org.springframework.data.domain.Page;
38
import org.springframework.stereotype.Component;
39

    
40
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.*;
41
import static eu.dnetlib.openaire.common.ExporterConstants.*;
42

    
43
@Component
44
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
45
public class DsmCore {
46

    
47
	private static final Log log = LogFactory.getLog(DsmCore.class);
48

    
49
	@Autowired
50
	private MongoLoggerClient mongoLoggerClient;
51

    
52
	@Autowired
53
	private ISClient isClient;
54

    
55
	@Autowired
56
	private ObjectStoreClient objectStoreClient;
57

    
58
	@Autowired
59
	private DatasourceIndexClient datasourceIndexClient;
60

    
61
	@Autowired
62
	private VocabularyClient vocabularyClient;
63

    
64
	@Autowired
65
	private DatasourceDao dsDao;
66

    
67
	@Autowired
68
	private OpenaireExporterConfig config;
69

    
70
	private ListeningExecutorService executor;
71

    
72
	@PostConstruct
73
	public void init() {
74
		executor = MoreExecutors.listeningDecorator(
75
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
76
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
77
	}
78

    
79
	public List<Country> listCountries() throws DsmException {
80
		return dsDao.listCountries();
81
	}
82

    
83
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
84
			throws DsmException {
85
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
86
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
87
		final CountDownLatch outerLatch = new CountDownLatch(3);
88

    
89
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
90
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
91
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(asDetails(d), outerLatch, errors)));
92
			waitLatch(outerLatch, errors, config.getRequestTimeout());
93
		}
94

    
95
		return ResponseUtils.datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
96
	}
97

    
98
	public DatasourceResponse searchSnippet(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
99
			throws DsmException {
100

    
101
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
102
		return ResponseUtils.datasourceResponse(dsPage.map(d -> asSnippet(d)).getContent(), Queues.newLinkedBlockingQueue(), dsPage.getTotalElements());
103
	}
104

    
105
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
106
		return dsDao.findApiBaseURLs(requestFilter, page, size);
107
	}
108

    
109
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
110
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
111
		final List<ApiDetails> api = apis.stream()
112
				.map(DsmMappingUtils::asDetails)
113
				.collect(Collectors.toList());
114
		return ResponseUtils.apiResponse(api, api.size());
115
	}
116

    
117
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
118
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
119
		dsDao.setManaged(dsId, managed);
120
	}
121

    
122
	public boolean isManaged(final String dsId) throws DsmException {
123
		return dsDao.isManaged(dsId);
124
	}
125

    
126
	public boolean exist(final DatasourceDetails d) throws DsmException {
127
		return dsDao.existDs(d.getId());
128
	}
129

    
130
	public void save(final DatasourceDetails d) throws DsmException {
131
		dsDao.saveDs(asDbEntry(d));
132
		isClient.registerDS(d);
133
	}
134

    
135
	public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
136
		try {
137
			final Datasource ds = dsDao.getDs(d.getId());
138
			final DatasourceDbEntry dbEntry = (DatasourceDbEntry) ds;
139

    
140
			if (dbEntry == null) {
141
				throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId()));
142
			}
143

    
144
			final DatasourceDbEntry update = asDbEntry(d);
145
			if (d.getIdentities() != null) {
146
				final Set<IdentityDbEntry> identities = (
147
						new HashSet<>(
148
								Stream.of(
149
										update.getIdentities(),
150
										dbEntry.getIdentities())
151
										.flatMap(Collection::stream)
152
										.collect(
153
												Collectors.toMap(
154
														i -> i.getIssuertype() + i.getPid(),
155
														Function.identity(),
156
														(i1, i2) -> i1))
157
										.values()));
158
				copyNonNullProperties(update, dbEntry);
159
				dbEntry.setIdentities(identities);
160
			} else {
161
				copyNonNullProperties(update, dbEntry);
162
			}
163

    
164
			dsDao.saveDs(dbEntry);
165
			isClient.updateDatasourceFields(d.getId(), asMapOfChanges(d));
166
		} catch (Throwable e) {
167
			log.error(ExceptionUtils.getStackTrace(e));
168
			throw e;
169
		}
170
	}
171

    
172
	@Deprecated
173
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
174
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
175
		dsDao.updateName(dsId, officialname, englishname);
176

    
177
		final Map<String, String> changes = Maps.newHashMap();
178
		changes.put(OFFICIAL_NAME, XmlEscapers.xmlContentEscaper().escape(officialname));
179
		changes.put(ENGLISH_NAME, XmlEscapers.xmlContentEscaper().escape(englishname));
180
		isClient.updateDatasourceFields(dsId, changes);
181
	}
182

    
183
	@Deprecated
184
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
185
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
186

    
187
		dsDao.updateLogoUrl(dsId, logourl);
188
	}
189

    
190
	@Deprecated
191
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
192
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
193
		dsDao.updateCoordinates(dsId, latitude, longitude);
194

    
195
		final Map<String, String> changes = Maps.newHashMap();
196
		changes.put(LATITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
197
		changes.put(LONGITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
198
		isClient.updateDatasourceFields(dsId, changes);
199
	}
200

    
201
	@Deprecated
202
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
203
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
204
		dsDao.updateTimezone(dsId, timezone);
205

    
206
		final Map<String, String> changes = Maps.newHashMap();
207
		changes.put(TIMEZONE, XmlEscapers.xmlContentEscaper().escape(timezone));
208
		isClient.updateDatasourceFields(dsId, changes);
209
	}
210

    
211
	@Deprecated
212
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
213
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
214
		dsDao.updateTypology(dsId, typology);
215

    
216
		final Map<String, String> changes = Maps.newHashMap();
217
		changes.put(TYPOLOGY, XmlEscapers.xmlContentEscaper().escape(typology));
218
		isClient.updateDatasourceFields(dsId, changes);
219
	}
220

    
221
	@Deprecated
222
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
223
		log.info(String.format("updated datasource '%s' registering user with '%s'", dsId, registeredBy));
224
		dsDao.updateRegisteringUser(dsId, registeredBy);
225
	}
226

    
227
	@Deprecated
228
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
229
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
230
		dsDao.updatePlatform(dsId, platform);
231

    
232
		final Map<String, String> changes = Maps.newHashMap();
233
		changes.put(PLATFORM, XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the DATASOURCE_TYPE field.
234
		isClient.updateDatasourceFields(dsId, changes);
235
	}
236

    
237
	//TODO remove if unused
238
	public void deleteDs(final String dsId) throws DsmException {
239
		log.info(String.format("deleted datasource '%s'", dsId));
240
		dsDao.deleteDs(dsId);
241
	}
242

    
243
	// API
244

    
245
	public void updateApiOaiSet(final String dsId, final String apiId, final String oaiSet) throws DsmException {
246
		boolean insert = dsDao.upsertApiOaiSet(apiId, oaiSet);
247
		final Map<String, String> changes = Maps.newHashMap();
248
		changes.put(OAI_SET, XmlEscapers.xmlContentEscaper().escape(oaiSet));
249

    
250
		if (!insert) {
251
			isClient.updateAPIField(dsId, apiId, changes);
252
		} else {
253
			isClient.addAPIAttribute(dsId, apiId, changes);
254
		}
255
	}
256

    
257
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
258
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
259
		dsDao.updateApiBaseUrl(apiId, baseUrl);
260

    
261
		final Map<String, String> changes = Maps.newHashMap();
262
		changes.put(BASE_URL, XmlEscapers.xmlContentEscaper().escape(baseUrl));
263

    
264
		isClient.updateAPIField(dsId, apiId, changes);
265
	}
266

    
267
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
268
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
269
		dsDao.updateCompliance(null, apiId, compliance, override);
270

    
271
		final Map<String, String> changes = Maps.newHashMap();
272
		changes.put(COMPLIANCE, XmlEscapers.xmlAttributeEscaper().escape(compliance));
273

    
274
		isClient.updateAPIField(dsId, apiId, changes);
275
	}
276

    
277
	public void addApi(final ApiDetails api) throws DsmException {
278
		if (StringUtils.isBlank(api.getId())) {
279
			api.setId(createId(api));
280
			log.info(String.format("missing api id, created '%s'", api.getId()));
281
		}
282

    
283
		dsDao.addApi(asDbEntry(api));
284
		isClient.registerAPI(api);
285
	}
286

    
287
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
288
		//TODO handle the api removal in case of associated workflows.
289
		isClient.removeAPI(apiId);
290
		dsDao.deleteApi(null, apiId);
291
	}
292

    
293
	public void dropCaches() {
294
		mongoLoggerClient.dropCache();
295
		isClient.dropCache();
296
		vocabularyClient.dropCache();
297
	}
298

    
299
	// HELPERS //////////////
300

    
301
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
302
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
303
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
304
		getFirstHarvestingDate(d.getId(), outerLatch, errors, dsInfo);
305
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
306
		return dsInfo;
307
	}
308

    
309
	private void getFirstHarvestingDate(final String dsId,
310
			final CountDownLatch outerLatch,
311
			final Queue<Throwable> errors,
312
			final DatasourceInfo dsInfo) {
313
		Futures.addCallback(
314
				executor.submit(() -> mongoLoggerClient.firstHarvestDate(dsId)),
315
				new FutureCallback<String>() {
316
					@Override
317
					public void onSuccess(@Nullable final String firstHarvestDate) {
318
						dsInfo.setFirstHarvestDate(firstHarvestDate);
319
						outerLatch.countDown();
320
					}
321

    
322
					@Override
323
					public void onFailure(final Throwable e) {
324
						log.error(ExceptionUtils.getStackTrace(e));
325
						errors.offer(e);
326
						outerLatch.countDown();
327
					}
328
				}, executor);
329
	}
330

    
331
	private void getAggregationHistory(final String dsId,
332
			final CountDownLatch outerLatch,
333
			final Queue<Throwable> errors,
334
			final DatasourceInfo datasourceInfo) {
335
		Futures.addCallback(
336
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
337
				new FutureCallback<List<AggregationInfo>>() {
338

    
339
					public void onSuccess(List<AggregationInfo> info) {
340
						setAggregationHistory(datasourceInfo, info);
341
						outerLatch.countDown();
342
					}
343

    
344
					public void onFailure(Throwable e) {
345
						log.error(ExceptionUtils.getStackTrace(e));
346
						errors.offer(e);
347
						outerLatch.countDown();
348
					}
349
				}, executor);
350
	}
351

    
352
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
353
		datasourceInfo.setAggregationHistory(info);
354
		if (!info.isEmpty()) {
355
			datasourceInfo
356
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
357
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
358
		}
359
	}
360

    
361
	private void getIndexDsInfo(final String dsId,
362
			final CountDownLatch outerLatch,
363
			final Queue<Throwable> errors,
364
			final DatasourceInfo datasourceInfo) {
365
		Futures.addCallback(
366
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
367
				new FutureCallback<IndexDsInfo>() {
368

    
369
					public void onSuccess(final IndexDsInfo info) {
370

    
371
						final CountDownLatch innerLatch = new CountDownLatch(2);
372

    
373
						Futures.addCallback(
374
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
375
								new FutureCallback<IndexRecordsInfo>() {
376

    
377
									public void onSuccess(IndexRecordsInfo info) {
378
										datasourceInfo
379
												.setIndexRecords(info.getTotal())
380
												.setFundedContent(info.getFunded())
381
												.setLastIndexingDate(info.getDate());
382
										innerLatch.countDown();
383
									}
384

    
385
									public void onFailure(Throwable e) {
386
										errors.offer(e);
387
										innerLatch.countDown();
388
									}
389
								}, executor);
390

    
391
						Futures.addCallback(
392
								executor.submit(() ->
393
										objectStoreClient.getObjectStoreSize(isClient.getObjectStoreId(dsId))),
394
								new FutureCallback<Long>() {
395
									@Override
396
									public void onSuccess(final Long objectStoreSize) {
397
										datasourceInfo.setFulltexts(objectStoreSize);
398
										innerLatch.countDown();
399
									}
400

    
401
									@Override
402
									public void onFailure(final Throwable e) {
403
										errors.offer(e);
404
										innerLatch.countDown();
405
									}
406
								}, executor);
407

    
408
						waitLatch(innerLatch, errors, config.getRequestTimeout());
409

    
410
						outerLatch.countDown();
411
					}
412

    
413
					public void onFailure(final Throwable e) {
414
						//log.error(ExceptionUtils.getStackTrace(e));
415
						errors.offer(e);
416
						outerLatch.countDown();
417
					}
418
				}, executor);
419
	}
420

    
421
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
422
		try {
423
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
424
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
425
			}
426
		} catch (final InterruptedException e) {
427
			errors.offer(e);
428
		}
429
	}
430

    
431
}
(2-2/2)