Project

General

Profile

1
package eu.dnetlib.openaire.dsm;
2

    
3
import java.util.*;
4
import java.util.concurrent.CountDownLatch;
5
import java.util.concurrent.ScheduledThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
import java.util.concurrent.TimeoutException;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10
import java.util.stream.Stream;
11
import javax.annotation.Nullable;
12
import javax.annotation.PostConstruct;
13

    
14
import com.google.common.collect.Lists;
15
import com.google.common.collect.Maps;
16
import com.google.common.collect.Queues;
17
import com.google.common.util.concurrent.*;
18
import com.google.common.xml.XmlEscapers;
19
import eu.dnetlib.OpenaireExporterConfig;
20
import eu.dnetlib.enabling.datasources.common.*;
21
import eu.dnetlib.openaire.common.ISClient;
22
import eu.dnetlib.openaire.dsm.dao.*;
23
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
24
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
25
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
26
import eu.dnetlib.openaire.dsm.domain.*;
27
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
28
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
29
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
30
import eu.dnetlib.openaire.vocabularies.Country;
31
import org.apache.commons.lang3.StringUtils;
32
import org.apache.commons.lang3.exception.ExceptionUtils;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
37
import org.springframework.data.domain.Page;
38
import org.springframework.stereotype.Component;
39

    
40
import eu.dnetlib.openaire.community.CommunityClient;
41

    
42
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.*;
43
import static eu.dnetlib.openaire.common.ExporterConstants.*;
44

    
45
@Component
46
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
47
public class DsmCore {
48

    
49
	private static final Log log = LogFactory.getLog(DsmCore.class);
50

    
51
	@Autowired
52
	private MongoLoggerClient mongoLoggerClient;
53

    
54
	@Autowired
55
	private ISClient isClient;
56

    
57
	@Autowired
58
	private ObjectStoreClient objectStoreClient;
59

    
60
	@Autowired
61
	private DatasourceIndexClient datasourceIndexClient;
62

    
63
	@Autowired
64
	private VocabularyClient vocabularyClient;
65

    
66
	@Autowired
67
	private DatasourceDao dsDao;
68

    
69
	@Autowired
70
	private OpenaireExporterConfig config;
71

    
72
	@Autowired
73
	private CommunityClient communityClient;
74

    
75
	private ListeningExecutorService executor;
76

    
77
	@PostConstruct
78
	public void init() {
79
		executor = MoreExecutors.listeningDecorator(
80
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
81
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
82
	}
83

    
84
	public List<Country> listCountries() throws DsmException {
85
		return dsDao.listCountries();
86
	}
87

    
88
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
89
			throws DsmException {
90
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
91
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
92
		final CountDownLatch outerLatch = new CountDownLatch(2);
93

    
94
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
95
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
96
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(asDetails(d), outerLatch, errors)));
97
			waitLatch(outerLatch, errors, config.getRequestTimeout());
98
		}
99

    
100
		return ResponseUtils.datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
101
	}
102

    
103
	public DatasourceResponse searchSnippet(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
104
			throws DsmException {
105

    
106
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
107
		return ResponseUtils.datasourceResponse(dsPage.map(d -> asSnippet(d)).getContent(), Queues.newLinkedBlockingQueue(), dsPage.getTotalElements());
108
	}
109

    
110
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
111
		return dsDao.findApiBaseURLs(requestFilter, page, size);
112
	}
113

    
114
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
115
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
116
		final List<ApiDetails> api = apis.stream()
117
				.map(DsmMappingUtils::asDetails)
118
				.collect(Collectors.toList());
119
		return ResponseUtils.apiResponse(api, api.size());
120
	}
121

    
122
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
123
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
124
		dsDao.setManaged(dsId, managed);
125
	}
126

    
127
	public boolean isManaged(final String dsId) throws DsmException {
128
		return dsDao.isManaged(dsId);
129
	}
130

    
131
	public boolean exist(final DatasourceDetails d) throws DsmException {
132
		return dsDao.existDs(d.getId());
133
	}
134

    
135
	public void save(final DatasourceDetails d) throws DsmException {
136
		dsDao.saveDs(asDbEntry(d));
137
		isClient.registerDS(d);
138
	}
139

    
140
	public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
141
		try {
142
			final Datasource ds = dsDao.getDs(d.getId());
143
			final DatasourceDbEntry dbEntry = (DatasourceDbEntry) ds;
144

    
145
			if (dbEntry == null) {
146
				throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId()));
147
			}
148

    
149
			final DatasourceDbEntry update = asDbEntry(d);
150
			if (d.getIdentities() != null) {
151
				final Set<IdentityDbEntry> identities = (
152
						new HashSet<>(
153
								Stream.of(
154
										update.getIdentities(),
155
										dbEntry.getIdentities())
156
										.flatMap(Collection::stream)
157
										.collect(
158
												Collectors.toMap(
159
														i -> i.getIssuertype() + i.getPid(),
160
														Function.identity(),
161
														(i1, i2) -> i1))
162
										.values()));
163
				copyNonNullProperties(update, dbEntry);
164
				dbEntry.setIdentities(identities);
165
			} else {
166
				copyNonNullProperties(update, dbEntry);
167
			}
168

    
169
			dsDao.saveDs(dbEntry);
170
			isClient.updateDatasourceFields(d.getId(), asMapOfChanges(d));
171
		} catch (Throwable e) {
172
			log.error(ExceptionUtils.getStackTrace(e));
173
			throw e;
174
		}
175
	}
176

    
177
	@Deprecated
178
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
179
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
180
		dsDao.updateName(dsId, officialname, englishname);
181

    
182
		final Map<String, String> changes = Maps.newHashMap();
183
		changes.put(OFFICIAL_NAME, XmlEscapers.xmlContentEscaper().escape(officialname));
184
		changes.put(ENGLISH_NAME, XmlEscapers.xmlContentEscaper().escape(englishname));
185
		isClient.updateDatasourceFields(dsId, changes);
186
	}
187

    
188
	@Deprecated
189
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
190
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
191

    
192
		dsDao.updateLogoUrl(dsId, logourl);
193
	}
194

    
195
	@Deprecated
196
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
197
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
198
		dsDao.updateCoordinates(dsId, latitude, longitude);
199

    
200
		final Map<String, String> changes = Maps.newHashMap();
201
		changes.put(LATITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
202
		changes.put(LONGITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
203
		isClient.updateDatasourceFields(dsId, changes);
204
	}
205

    
206
	@Deprecated
207
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
208
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
209
		dsDao.updateTimezone(dsId, timezone);
210

    
211
		final Map<String, String> changes = Maps.newHashMap();
212
		changes.put(TIMEZONE, XmlEscapers.xmlContentEscaper().escape(timezone));
213
		isClient.updateDatasourceFields(dsId, changes);
214
	}
215

    
216
	@Deprecated
217
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
218
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
219
		dsDao.updateTypology(dsId, typology);
220

    
221
		final Map<String, String> changes = Maps.newHashMap();
222
		changes.put(TYPOLOGY, XmlEscapers.xmlContentEscaper().escape(typology));
223
		isClient.updateDatasourceFields(dsId, changes);
224
	}
225

    
226
	@Deprecated
227
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
228
		log.info(String.format("setting datasource '%s' registering user with '%s'", dsId, registeredBy));
229
		dsDao.updateRegisteringUser(dsId, registeredBy);
230
	}
231

    
232
	@Deprecated
233
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
234
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
235
		dsDao.updatePlatform(dsId, platform);
236

    
237
		final Map<String, String> changes = Maps.newHashMap();
238
		changes.put(PLATFORM, XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the DATASOURCE_TYPE field.
239
		isClient.updateDatasourceFields(dsId, changes);
240
	}
241

    
242
	//TODO remove if unused
243
	public void deleteDs(final String dsId) throws DsmException {
244
		log.info(String.format("deleted datasource '%s'", dsId));
245
		dsDao.deleteDs(dsId);
246
	}
247

    
248
	// API
249

    
250
	public void updateApiOaiSet(final String dsId, final String apiId, final String oaiSet) throws DsmException {
251
		boolean insert = dsDao.upsertApiOaiSet(apiId, oaiSet);
252
		final Map<String, String> changes = Maps.newHashMap();
253
		changes.put(OAI_SET, XmlEscapers.xmlContentEscaper().escape(oaiSet));
254

    
255
		if (!insert) {
256
			isClient.updateAPIField(dsId, apiId, changes);
257
		} else {
258
			isClient.addAPIAttribute(dsId, apiId, changes);
259
		}
260
	}
261

    
262
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
263
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
264
		dsDao.updateApiBaseUrl(apiId, baseUrl);
265

    
266
		final Map<String, String> changes = Maps.newHashMap();
267
		changes.put(BASE_URL, XmlEscapers.xmlContentEscaper().escape(baseUrl));
268

    
269
		isClient.updateAPIField(dsId, apiId, changes);
270
	}
271

    
272
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
273
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
274
		dsDao.updateCompliance(null, apiId, compliance, override);
275

    
276
		final Map<String, String> changes = Maps.newHashMap();
277
		changes.put(COMPLIANCE, XmlEscapers.xmlAttributeEscaper().escape(compliance));
278

    
279
		isClient.updateAPIField(dsId, apiId, changes);
280
	}
281

    
282
	public void addApi(final ApiDetails api) throws DsmException {
283
		if (StringUtils.isBlank(api.getId())) {
284
			api.setId(createId(api));
285
			log.info(String.format("missing api id, created '%s'", api.getId()));
286
		}
287

    
288
		dsDao.addApi(asDbEntry(api));
289
		isClient.registerAPI(api);
290
	}
291

    
292
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
293
		//TODO handle the api removal in case of associated workflows.
294
		isClient.removeAPI(apiId);
295
		dsDao.deleteApi(null, apiId);
296
	}
297

    
298
	public void dropCaches() {
299
		mongoLoggerClient.dropCache();
300
		isClient.dropCache();
301
		vocabularyClient.dropCache();
302
		communityClient.dropCache();
303
	}
304

    
305
	// HELPERS //////////////
306

    
307
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
308
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
309
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
310
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
311
		return dsInfo;
312
	}
313

    
314
	private void getAggregationHistory(final String dsId,
315
			final CountDownLatch outerLatch,
316
			final Queue<Throwable> errors,
317
			final DatasourceInfo datasourceInfo) {
318
		Futures.addCallback(
319
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
320
				new FutureCallback<List<AggregationInfo>>() {
321

    
322
					public void onSuccess(List<AggregationInfo> info) {
323
						setAggregationHistory(datasourceInfo, info);
324
						outerLatch.countDown();
325
					}
326

    
327
					public void onFailure(Throwable e) {
328
						log.error(ExceptionUtils.getStackTrace(e));
329
						errors.offer(e);
330
						outerLatch.countDown();
331
					}
332
				}, executor);
333
	}
334

    
335
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
336
		datasourceInfo.setAggregationHistory(info);
337
		if (!info.isEmpty()) {
338
			datasourceInfo
339
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
340
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
341
		}
342
	}
343

    
344
	private void getIndexDsInfo(final String dsId,
345
			final CountDownLatch outerLatch,
346
			final Queue<Throwable> errors,
347
			final DatasourceInfo datasourceInfo) {
348
		Futures.addCallback(
349
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
350
				new FutureCallback<IndexDsInfo>() {
351

    
352
					public void onSuccess(final IndexDsInfo info) {
353

    
354
						final CountDownLatch innerLatch = new CountDownLatch(2);
355

    
356
						Futures.addCallback(
357
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
358
								new FutureCallback<IndexRecordsInfo>() {
359

    
360
									public void onSuccess(IndexRecordsInfo info) {
361
										datasourceInfo
362
												.setIndexRecords(info.getTotal())
363
												.setFundedContent(info.getFunded())
364
												.setLastIndexingDate(info.getDate());
365
										innerLatch.countDown();
366
									}
367

    
368
									public void onFailure(Throwable e) {
369
										errors.offer(e);
370
										innerLatch.countDown();
371
									}
372
								}, executor);
373

    
374
						Futures.addCallback(
375
								executor.submit(() ->
376
										objectStoreClient.getObjectStoreSize(isClient.getObjectStoreId(dsId))),
377
								new FutureCallback<Long>() {
378
									@Override
379
									public void onSuccess(final Long objectStoreSize) {
380
										datasourceInfo.setFulltexts(objectStoreSize);
381
										innerLatch.countDown();
382
									}
383

    
384
									@Override
385
									public void onFailure(final Throwable e) {
386
										errors.offer(e);
387
										innerLatch.countDown();
388
									}
389
								}, executor);
390

    
391
						waitLatch(innerLatch, errors, config.getRequestTimeout());
392

    
393
						outerLatch.countDown();
394
					}
395

    
396
					public void onFailure(final Throwable e) {
397
						//log.error(ExceptionUtils.getStackTrace(e));
398
						errors.offer(e);
399
						outerLatch.countDown();
400
					}
401
				}, executor);
402
	}
403

    
404
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
405
		try {
406
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
407
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
408
			}
409
		} catch (final InterruptedException e) {
410
			errors.offer(e);
411
		}
412
	}
413

    
414
}
(2-2/2)