Project

General

Profile

1
package eu.dnetlib.openaire.dsm;
2

    
3
import java.util.*;
4
import java.util.concurrent.CountDownLatch;
5
import java.util.concurrent.ScheduledThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
import java.util.concurrent.TimeoutException;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10
import java.util.stream.Stream;
11
import javax.annotation.Nullable;
12
import javax.annotation.PostConstruct;
13

    
14
import com.google.common.collect.Lists;
15
import com.google.common.collect.Maps;
16
import com.google.common.collect.Queues;
17
import com.google.common.util.concurrent.*;
18
import com.google.common.xml.XmlEscapers;
19
import eu.dnetlib.OpenaireExporterConfig;
20
import eu.dnetlib.enabling.datasources.common.*;
21
import eu.dnetlib.openaire.common.ISClient;
22
import eu.dnetlib.openaire.dsm.dao.*;
23
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
24
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
25
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
26
import eu.dnetlib.openaire.dsm.domain.*;
27
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
28
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
29
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
30
import eu.dnetlib.openaire.vocabularies.Country;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.lang3.exception.ExceptionUtils;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
37
import org.springframework.data.domain.Page;
38
import org.springframework.stereotype.Component;
39

    
40
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.*;
41
import static eu.dnetlib.openaire.common.ExporterConstants.*;
42

    
43
@Component
44
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
45
public class DsmCore {
46

    
47
	private static final Log log = LogFactory.getLog(DsmCore.class);
48

    
49
	@Autowired
50
	private MongoLoggerClient mongoLoggerClient;
51

    
52
	@Autowired
53
	private ISClient isClient;
54

    
55
	@Autowired
56
	private ObjectStoreClient objectStoreClient;
57

    
58
	@Autowired
59
	private DatasourceIndexClient datasourceIndexClient;
60

    
61
	@Autowired
62
	private VocabularyClient vocabularyClient;
63

    
64
	@Autowired
65
	private DatasourceDao dsDao;
66

    
67
	@Autowired
68
	private OpenaireExporterConfig config;
69

    
70
	private ListeningExecutorService executor;
71

    
72
	@PostConstruct
73
	public void init() {
74
		executor = MoreExecutors.listeningDecorator(
75
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
76
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
77
	}
78

    
79
	public List<Country> listCountries() throws DsmException {
80
		return dsDao.listCountries();
81
	}
82

    
83
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
84
			throws DsmException {
85
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
86
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
87
		final CountDownLatch outerLatch = new CountDownLatch(2);
88

    
89
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
90
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
91
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(asDetails(d), outerLatch, errors)));
92
			waitLatch(outerLatch, errors, config.getRequestTimeout());
93
		}
94

    
95
		return ResponseUtils.datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
96
	}
97

    
98
	public DatasourceResponse searchSnippet(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
99
			throws DsmException {
100

    
101
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
102
		return ResponseUtils.datasourceResponse(dsPage.map(d -> asSnippet(d)).getContent(), Queues.newLinkedBlockingQueue(), dsPage.getTotalElements());
103
	}
104

    
105
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
106
		return dsDao.findApiBaseURLs(requestFilter, page, size);
107
	}
108

    
109
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
110
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
111
		final List<ApiDetails> api = apis.stream()
112
				.map(DsmMappingUtils::asDetails)
113
				.collect(Collectors.toList());
114
		return ResponseUtils.apiResponse(api, api.size());
115
	}
116

    
117
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
118
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
119
		dsDao.setManaged(dsId, managed);
120
	}
121

    
122
	public boolean isManaged(final String dsId) throws DsmException {
123
		return dsDao.isManaged(dsId);
124
	}
125

    
126
	public boolean exist(final DatasourceDetails d) throws DsmException {
127
		return dsDao.existDs(d.getId());
128
	}
129

    
130
	public void save(final DatasourceDetails d) throws DsmException {
131
		dsDao.saveDs(asDbEntry(d));
132
		isClient.registerDS(d);
133
	}
134

    
135
	public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
136
		try {
137
			final Datasource ds = dsDao.getDs(d.getId());
138
			final DatasourceDbEntry dbEntry = (DatasourceDbEntry) ds;
139

    
140
			if (dbEntry == null) {
141
				throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId()));
142
			}
143

    
144
			final DatasourceDbEntry update = asDbEntry(d);
145
			if (d.getIdentities() != null) {
146
				final Set<IdentityDbEntry> identities = (
147
						new HashSet<>(
148
								Stream.of(
149
										update.getIdentities(),
150
										dbEntry.getIdentities())
151
										.flatMap(Collection::stream)
152
										.collect(
153
												Collectors.toMap(
154
														i -> i.getIssuertype() + i.getPid(),
155
														Function.identity(),
156
														(i1, i2) -> i1))
157
										.values()));
158
				copyNonNullProperties(update, dbEntry);
159
				dbEntry.setIdentities(identities);
160
			} else {
161
				copyNonNullProperties(update, dbEntry);
162
			}
163

    
164
			dsDao.saveDs(dbEntry);
165
			isClient.updateDatasourceFields(d.getId(), asMapOfChanges(d));
166
		} catch (Throwable e) {
167
			log.error(ExceptionUtils.getStackTrace(e));
168
			throw e;
169
		}
170
	}
171

    
172
	@Deprecated
173
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
174
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
175
		dsDao.updateName(dsId, officialname, englishname);
176

    
177
		final Map<String, String> changes = Maps.newHashMap();
178
		changes.put(OFFICIAL_NAME, XmlEscapers.xmlContentEscaper().escape(officialname));
179
		changes.put(ENGLISH_NAME, XmlEscapers.xmlContentEscaper().escape(englishname));
180
		isClient.updateDatasourceFields(dsId, changes);
181
	}
182

    
183
	@Deprecated
184
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
185
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
186

    
187
		dsDao.updateLogoUrl(dsId, logourl);
188
	}
189

    
190
	@Deprecated
191
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
192
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
193
		dsDao.updateCoordinates(dsId, latitude, longitude);
194

    
195
		final Map<String, String> changes = Maps.newHashMap();
196
		changes.put(LATITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
197
		changes.put(LONGITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
198
		isClient.updateDatasourceFields(dsId, changes);
199
	}
200

    
201
	@Deprecated
202
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
203
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
204
		dsDao.updateTimezone(dsId, timezone);
205

    
206
		final Map<String, String> changes = Maps.newHashMap();
207
		changes.put(TIMEZONE, XmlEscapers.xmlContentEscaper().escape(timezone));
208
		isClient.updateDatasourceFields(dsId, changes);
209
	}
210

    
211
	@Deprecated
212
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
213
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
214
		dsDao.updateTypology(dsId, typology);
215

    
216
		final Map<String, String> changes = Maps.newHashMap();
217
		changes.put(TYPOLOGY, XmlEscapers.xmlContentEscaper().escape(typology));
218
		isClient.updateDatasourceFields(dsId, changes);
219
	}
220

    
221
	@Deprecated
222
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
223
		log.info(String.format("setting datasource '%s' registering user with '%s'", dsId, registeredBy));
224
		dsDao.updateRegisteringUser(dsId, registeredBy);
225
	}
226

    
227
	@Deprecated
228
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
229
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
230
		dsDao.updatePlatform(dsId, platform);
231

    
232
		final Map<String, String> changes = Maps.newHashMap();
233
		changes.put(PLATFORM, XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the DATASOURCE_TYPE field.
234
		isClient.updateDatasourceFields(dsId, changes);
235
	}
236

    
237
	//TODO remove if unused
238
	public void deleteDs(final String dsId) throws DsmException {
239
		log.info(String.format("deleted datasource '%s'", dsId));
240
		dsDao.deleteDs(dsId);
241
	}
242

    
243
	// API
244

    
245
	public void updateApiOaiSet(final String dsId, final String apiId, final String oaiSet) throws DsmException {
246
		boolean insert = dsDao.upsertApiOaiSet(apiId, oaiSet);
247
		final Map<String, String> changes = Maps.newHashMap();
248
		changes.put(OAI_SET, XmlEscapers.xmlContentEscaper().escape(oaiSet));
249

    
250
		if (!insert) {
251
			isClient.updateAPIField(dsId, apiId, changes);
252
		} else {
253
			isClient.addAPIAttribute(dsId, apiId, changes);
254
		}
255
	}
256

    
257
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
258
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
259
		dsDao.updateApiBaseUrl(apiId, baseUrl);
260

    
261
		final Map<String, String> changes = Maps.newHashMap();
262
		changes.put(BASE_URL, XmlEscapers.xmlContentEscaper().escape(baseUrl));
263

    
264
		isClient.updateAPIField(dsId, apiId, changes);
265
	}
266

    
267
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
268
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
269
		dsDao.updateCompliance(null, apiId, compliance, override);
270

    
271
		final Map<String, String> changes = Maps.newHashMap();
272
		changes.put(COMPLIANCE, XmlEscapers.xmlAttributeEscaper().escape(compliance));
273

    
274
		isClient.updateAPIField(dsId, apiId, changes);
275
	}
276

    
277
	public void addApi(final ApiDetails api) throws DsmException {
278
		if (StringUtils.isBlank(api.getId())) {
279
			api.setId(createId(api));
280
			log.info(String.format("missing api id, created '%s'", api.getId()));
281
		}
282

    
283
		dsDao.addApi(asDbEntry(api));
284
		isClient.registerAPI(api);
285
	}
286

    
287
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
288
		//TODO handle the api removal in case of associated workflows.
289
		isClient.removeAPI(apiId);
290
		dsDao.deleteApi(null, apiId);
291
	}
292

    
293
	public void dropCaches() {
294
		mongoLoggerClient.dropCache();
295
		isClient.dropCache();
296
		vocabularyClient.dropCache();
297
	}
298

    
299
	// HELPERS //////////////
300

    
301
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
302
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
303
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
304
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
305
		return dsInfo;
306
	}
307

    
308
	private void getAggregationHistory(final String dsId,
309
			final CountDownLatch outerLatch,
310
			final Queue<Throwable> errors,
311
			final DatasourceInfo datasourceInfo) {
312
		Futures.addCallback(
313
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
314
				new FutureCallback<List<AggregationInfo>>() {
315

    
316
					public void onSuccess(List<AggregationInfo> info) {
317
						setAggregationHistory(datasourceInfo, info);
318
						outerLatch.countDown();
319
					}
320

    
321
					public void onFailure(Throwable e) {
322
						log.error(ExceptionUtils.getStackTrace(e));
323
						errors.offer(e);
324
						outerLatch.countDown();
325
					}
326
				}, executor);
327
	}
328

    
329
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
330
		datasourceInfo.setAggregationHistory(info);
331
		if (!info.isEmpty()) {
332
			datasourceInfo
333
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
334
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
335
		}
336
	}
337

    
338
	private void getIndexDsInfo(final String dsId,
339
			final CountDownLatch outerLatch,
340
			final Queue<Throwable> errors,
341
			final DatasourceInfo datasourceInfo) {
342
		Futures.addCallback(
343
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
344
				new FutureCallback<IndexDsInfo>() {
345

    
346
					public void onSuccess(final IndexDsInfo info) {
347

    
348
						final CountDownLatch innerLatch = new CountDownLatch(2);
349

    
350
						Futures.addCallback(
351
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
352
								new FutureCallback<IndexRecordsInfo>() {
353

    
354
									public void onSuccess(IndexRecordsInfo info) {
355
										datasourceInfo
356
												.setIndexRecords(info.getTotal())
357
												.setFundedContent(info.getFunded())
358
												.setLastIndexingDate(info.getDate());
359
										innerLatch.countDown();
360
									}
361

    
362
									public void onFailure(Throwable e) {
363
										errors.offer(e);
364
										innerLatch.countDown();
365
									}
366
								}, executor);
367

    
368
						Futures.addCallback(
369
								executor.submit(() ->
370
										objectStoreClient.getObjectStoreSize(isClient.getObjectStoreId(dsId))),
371
								new FutureCallback<Long>() {
372
									@Override
373
									public void onSuccess(final Long objectStoreSize) {
374
										datasourceInfo.setFulltexts(objectStoreSize);
375
										innerLatch.countDown();
376
									}
377

    
378
									@Override
379
									public void onFailure(final Throwable e) {
380
										errors.offer(e);
381
										innerLatch.countDown();
382
									}
383
								}, executor);
384

    
385
						waitLatch(innerLatch, errors, config.getRequestTimeout());
386

    
387
						outerLatch.countDown();
388
					}
389

    
390
					public void onFailure(final Throwable e) {
391
						//log.error(ExceptionUtils.getStackTrace(e));
392
						errors.offer(e);
393
						outerLatch.countDown();
394
					}
395
				}, executor);
396
	}
397

    
398
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
399
		try {
400
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
401
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
402
			}
403
		} catch (final InterruptedException e) {
404
			errors.offer(e);
405
		}
406
	}
407

    
408
}
(2-2/2)