Project

General

Profile

1
package eu.dnetlib.openaire.dsm;
2

    
3
import java.util.*;
4
import java.util.concurrent.CountDownLatch;
5
import java.util.concurrent.ScheduledThreadPoolExecutor;
6
import java.util.concurrent.TimeUnit;
7
import java.util.concurrent.TimeoutException;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10
import java.util.stream.Stream;
11
import javax.annotation.PostConstruct;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import com.google.common.util.concurrent.*;
17
import com.google.common.xml.XmlEscapers;
18
import eu.dnetlib.OpenaireExporterConfig;
19
import eu.dnetlib.enabling.datasources.common.*;
20
import eu.dnetlib.openaire.common.ISClient;
21
import eu.dnetlib.openaire.dsm.dao.*;
22
import eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils;
23
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
24
import eu.dnetlib.openaire.dsm.dao.utils.IndexRecordsInfo;
25
import eu.dnetlib.openaire.dsm.domain.*;
26
import eu.dnetlib.openaire.dsm.domain.db.ApiDbEntry;
27
import eu.dnetlib.openaire.dsm.domain.db.DatasourceDbEntry;
28
import eu.dnetlib.openaire.dsm.domain.db.IdentityDbEntry;
29
import eu.dnetlib.openaire.vocabularies.Country;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.commons.lang3.exception.ExceptionUtils;
32
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.LogFactory;
34
import org.springframework.beans.factory.annotation.Autowired;
35
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
36
import org.springframework.data.domain.Page;
37
import org.springframework.stereotype.Component;
38

    
39
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.*;
40
import static eu.dnetlib.openaire.common.ExporterConstants.*;
41

    
42
@Component
43
@ConditionalOnProperty(value = "openaire.exporter.enable.dsm", havingValue = "true")
44
public class DsmCore {
45

    
46
	private static final Log log = LogFactory.getLog(DsmCore.class);
47

    
48
	@Autowired
49
	private MongoLoggerClient mongoLoggerClient;
50

    
51
	@Autowired
52
	private ISClient isClient;
53

    
54
	@Autowired
55
	private ObjectStoreClient objectStoreClient;
56

    
57
	@Autowired
58
	private DatasourceIndexClient datasourceIndexClient;
59

    
60
	@Autowired
61
	private VocabularyClient vocabularyClient;
62

    
63
	@Autowired
64
	private DatasourceDao dsDao;
65

    
66
	@Autowired
67
	private OpenaireExporterConfig config;
68

    
69
	private ListeningExecutorService executor;
70

    
71
	@PostConstruct
72
	public void init() {
73
		executor = MoreExecutors.listeningDecorator(
74
				new ScheduledThreadPoolExecutor(config.getRequestWorkers(),
75
						new ThreadFactoryBuilder().setNameFormat("dsm-client-%d").build()));
76
	}
77

    
78
	public List<Country> listCountries() throws DsmException {
79
		return dsDao.listCountries();
80
	}
81

    
82
	public DatasourceResponse search(RequestSort requestSortBy, RequestSortOrder order, RequestFilter requestFilter, int page, int size)
83
			throws DsmException {
84
		final List<DatasourceInfo> datasourceInfo = Lists.newArrayList();
85
		final Queue<Throwable> errors = Queues.newLinkedBlockingQueue();
86
		final CountDownLatch outerLatch = new CountDownLatch(2);
87

    
88
		final Page<DatasourceDbEntry> dsPage = dsDao.search(requestSortBy, order, requestFilter, page, size);
89
		if (dsPage.getTotalElements() > 0 && dsPage.getNumberOfElements() > 0) {
90
			dsPage.forEach(d -> datasourceInfo.add(enrichDatasourceInfo(asDetails(d), outerLatch, errors)));
91
			waitLatch(outerLatch, errors, config.getRequestTimeout());
92
		}
93

    
94
		return ResponseUtils.datasourceResponse(datasourceInfo, errors, dsPage.getTotalElements());
95
	}
96

    
97
	public List<String> findBaseURLs(final RequestFilter requestFilter, final int page, final int size) throws DsmException {
98
		return dsDao.findApiBaseURLs(requestFilter, page, size);
99
	}
100

    
101
	public ApiDetailsResponse getApis(final String dsId) throws DsmException {
102
		final List<ApiDbEntry> apis = dsDao.getApis(dsId);
103
		final List<ApiDetails> api = apis.stream()
104
				.map(DsmMappingUtils::asDetails)
105
				.collect(Collectors.toList());
106
		return ResponseUtils.apiResponse(api, api.size());
107
	}
108

    
109
	public void setManaged(final String dsId, final boolean managed) throws DsmException {
110
		log.info(String.format("updated api '%s' managed with '%s'", dsId, managed));
111
		dsDao.setManaged(dsId, managed);
112
	}
113

    
114
	public boolean isManaged(final String dsId) throws DsmException {
115
		return dsDao.isManaged(dsId);
116
	}
117

    
118
	public boolean exist(final DatasourceDetails d) throws DsmException {
119
		return ((DatasourceDaoImpl) dsDao).existDs(d.getId());
120
	}
121

    
122
	public void save(final DatasourceDetails d) throws DsmException {
123
		dsDao.saveDs(asDbEntry(d));
124
		isClient.registerDS(d);
125
	}
126

    
127
	public void updateDatasource(final DatasourceDetailsUpdate d) throws DsmException, DsmNotFoundException {
128
		try {
129
			final DatasourceDbEntry dbEntry = ((DatasourceDaoImpl) dsDao).getDs(d.getId());
130

    
131
			if (dbEntry == null) {
132
				throw new DsmNotFoundException(String.format("ds '%s' does not exist", d.getId()));
133
			}
134

    
135
			final DatasourceDbEntry update = asDbEntry(d);
136
			if (d.getIdentities() != null) {
137
				final Set<IdentityDbEntry> identities = (
138
						new HashSet<>(
139
								Stream.of(
140
										update.getIdentities(),
141
										dbEntry.getIdentities())
142
										.flatMap(Collection::stream)
143
										.collect(
144
												Collectors.toMap(
145
														i -> i.getIssuertype() + i.getPid(),
146
														Function.identity(),
147
														(i1, i2) -> i1))
148
										.values()));
149
				copyNonNullProperties(update, dbEntry);
150
				dbEntry.setIdentities(identities);
151
			} else {
152
				copyNonNullProperties(update, dbEntry);
153
			}
154

    
155
			dsDao.saveDs(dbEntry);
156
			isClient.updateDatasourceFields(d.getId(), asMapOfChanges(d));
157
		} catch (Throwable e) {
158
			log.error(ExceptionUtils.getStackTrace(e));
159
			throw e;
160
		}
161
	}
162

    
163
	@Deprecated
164
	public void updateDatasourcename(final String dsId, final String officialname, final String englishname) throws DsmException {
165
		log.info(String.format("updated datasource '%s' with officialname '%s' and englishname '%s'", dsId, officialname, englishname));
166
		dsDao.updateName(dsId, officialname, englishname);
167

    
168
		final Map<String, String> changes = Maps.newHashMap();
169
		changes.put(OFFICIAL_NAME, XmlEscapers.xmlContentEscaper().escape(officialname));
170
		changes.put(ENGLISH_NAME, XmlEscapers.xmlContentEscaper().escape(englishname));
171
		isClient.updateDatasourceFields(dsId, changes);
172
	}
173

    
174
	@Deprecated
175
	public void updateDatasourceLogoUrl(final String dsId, final String logourl) throws DsmException {
176
		log.info(String.format("updated datasource '%s' with logo URL '%s'", dsId, logourl));
177

    
178
		dsDao.updateLogoUrl(dsId, logourl);
179
	}
180

    
181
	@Deprecated
182
	public void updateCoordinates(final String dsId, final Double latitude, final Double longitude) throws DsmException {
183
		log.info(String.format("updated datasource '%s' with coordinates Lat:'%s', Lon:'%s'", dsId, latitude, longitude));
184
		dsDao.updateCoordinates(dsId, latitude, longitude);
185

    
186
		final Map<String, String> changes = Maps.newHashMap();
187
		changes.put(LATITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(latitude)));
188
		changes.put(LONGITUDE, XmlEscapers.xmlContentEscaper().escape(String.valueOf(longitude)));
189
		isClient.updateDatasourceFields(dsId, changes);
190
	}
191

    
192
	@Deprecated
193
	public void updateTimezone(final String dsId, final String timezone) throws DsmException {
194
		log.info(String.format("updated datasource '%s' timezone with '%s'", dsId, timezone));
195
		dsDao.updateTimezone(dsId, timezone);
196

    
197
		final Map<String, String> changes = Maps.newHashMap();
198
		changes.put(TIMEZONE, XmlEscapers.xmlContentEscaper().escape(timezone));
199
		isClient.updateDatasourceFields(dsId, changes);
200
	}
201

    
202
	@Deprecated
203
	public void updateDsTypology(final String dsId, final String typology) throws DsmException {
204
		log.info(String.format("updated datasource '%s' typology with '%s'", dsId, typology));
205
		dsDao.updateTypology(dsId, typology);
206

    
207
		final Map<String, String> changes = Maps.newHashMap();
208
		changes.put(TYPOLOGY, XmlEscapers.xmlContentEscaper().escape(typology));
209
		isClient.updateDatasourceFields(dsId, changes);
210
	}
211

    
212
	@Deprecated
213
	public void updateDsRegisteringUser(final String dsId, final String registeredBy) throws DsmException {
214
		log.info(String.format("updated datasource '%s' registering user with '%s'", dsId, registeredBy));
215
		dsDao.updateRegisteringUser(dsId, registeredBy);
216
	}
217

    
218
	@Deprecated
219
	public void updateDsPlatform(final String dsId, final String platform) throws DsmException {
220
		log.info(String.format("updated datasource '%s' platform with '%s'", dsId, platform));
221
		dsDao.updatePlatform(dsId, platform);
222

    
223
		final Map<String, String> changes = Maps.newHashMap();
224
		changes.put(PLATFORM, XmlEscapers.xmlContentEscaper().escape(platform)); // this is not a typo, Repository profiles map the platform in the DATASOURCE_TYPE field.
225
		isClient.updateDatasourceFields(dsId, changes);
226
	}
227

    
228
	//TODO remove if unused
229
	public void deleteDs(final String dsId) throws DsmException {
230
		log.info(String.format("deleted datasource '%s'", dsId));
231
		dsDao.deleteDs(dsId);
232
	}
233

    
234
	// API
235

    
236
	public void updateApiBaseurl(final String dsId, final String apiId, final String baseUrl) throws DsmException {
237
		log.info(String.format("updated api '%s' baseurl with '%s'", apiId, baseUrl));
238
		dsDao.updateApiBaseUrl(apiId, baseUrl);
239

    
240
		final Map<String, String> changes = Maps.newHashMap();
241
		changes.put(BASE_URL, XmlEscapers.xmlContentEscaper().escape(baseUrl));
242

    
243
		isClient.updateAPIField(dsId, apiId, changes);
244
	}
245

    
246
	public void updateApiCompatibility(final String dsId, final String apiId, final String compliance, final boolean override) throws DsmException {
247
		log.info(String.format("updated api '%s' compliance with '%s'", apiId, compliance));
248
		dsDao.updateCompliance(null, apiId, compliance, override);
249

    
250
		final Map<String, String> changes = Maps.newHashMap();
251
		changes.put(COMPLIANCE, XmlEscapers.xmlAttributeEscaper().escape(compliance));
252

    
253
		isClient.updateAPIField(dsId, apiId, changes);
254
	}
255

    
256
	public void addApi(final ApiDetails api) throws DsmException {
257
		if (StringUtils.isBlank(api.getId())) {
258
			api.setId(createId(api));
259
			log.info(String.format("missing api id, created '%s'", api.getId()));
260
		}
261

    
262
		dsDao.addApi(asDbEntry(api));
263
		isClient.registerAPI(api);
264
	}
265

    
266
	public void deleteApi(final String apiId) throws DsmForbiddenException, DsmException {
267
		//TODO handle the api removal in case of associated workflows.
268
		isClient.removeAPI(apiId);
269
		dsDao.deleteApi(null, apiId);
270
	}
271

    
272
	public void dropCaches() {
273
		mongoLoggerClient.dropCache();
274
		isClient.dropCache();
275
		vocabularyClient.dropCache();
276
	}
277

    
278
	// HELPERS //////////////
279

    
280
	private DatasourceInfo enrichDatasourceInfo(final DatasourceDetails d, final CountDownLatch outerLatch, final Queue<Throwable> errors) {
281
		final DatasourceInfo dsInfo = new DatasourceInfo().setDatasource(d);
282
		getAggregationHistory(d.getId(), outerLatch, errors, dsInfo);
283
		getIndexDsInfo(d.getId(), outerLatch, errors, dsInfo);
284
		return dsInfo;
285
	}
286

    
287
	private void getAggregationHistory(final String dsId,
288
			final CountDownLatch outerLatch,
289
			final Queue<Throwable> errors,
290
			final DatasourceInfo datasourceInfo) {
291
		Futures.addCallback(
292
				executor.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
293
				new FutureCallback<List<AggregationInfo>>() {
294

    
295
					public void onSuccess(List<AggregationInfo> info) {
296
						setAggregationHistory(datasourceInfo, info);
297
						outerLatch.countDown();
298
					}
299

    
300
					public void onFailure(Throwable e) {
301
						log.error(ExceptionUtils.getStackTrace(e));
302
						errors.offer(e);
303
						outerLatch.countDown();
304
					}
305
				}, executor);
306
	}
307

    
308
	private void setAggregationHistory(final DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
309
		datasourceInfo.setAggregationHistory(info);
310
		if (!info.isEmpty()) {
311
			datasourceInfo
312
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
313
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
314
		}
315
	}
316

    
317
	private void getIndexDsInfo(final String dsId,
318
			final CountDownLatch outerLatch,
319
			final Queue<Throwable> errors,
320
			final DatasourceInfo datasourceInfo) {
321
		Futures.addCallback(
322
				executor.submit(() -> isClient.calculateCurrentIndexDsInfo()),
323
				new FutureCallback<IndexDsInfo>() {
324

    
325
					public void onSuccess(final IndexDsInfo info) {
326

    
327
						final CountDownLatch innerLatch = new CountDownLatch(2);
328

    
329
						Futures.addCallback(
330
								executor.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info, errors)),
331
								new FutureCallback<IndexRecordsInfo>() {
332

    
333
									public void onSuccess(IndexRecordsInfo info) {
334
										datasourceInfo
335
												.setIndexRecords(info.getTotal())
336
												.setFundedContent(info.getFunded())
337
												.setLastIndexingDate(info.getDate());
338
										innerLatch.countDown();
339
									}
340

    
341
									public void onFailure(Throwable e) {
342
										errors.offer(e);
343
										innerLatch.countDown();
344
									}
345
								}, executor);
346

    
347
						Futures.addCallback(
348
								executor.submit(() ->
349
										objectStoreClient.getObjectStoreSize(
350
												isClient.getObjectStoreId(dsId, errors),
351
												errors)),
352
								new FutureCallback<Long>() {
353
									@Override
354
									public void onSuccess(final Long objectStoreSize) {
355
										datasourceInfo.setFulltexts(objectStoreSize);
356
										innerLatch.countDown();
357
									}
358

    
359
									@Override
360
									public void onFailure(final Throwable e) {
361
										errors.offer(e);
362
										innerLatch.countDown();
363
									}
364
								}, executor);
365

    
366
						waitLatch(innerLatch, errors, config.getRequestTimeout());
367

    
368
						outerLatch.countDown();
369
					}
370

    
371
					public void onFailure(final Throwable e) {
372
						//log.error(ExceptionUtils.getStackTrace(e));
373
						errors.offer(e);
374
						outerLatch.countDown();
375
					}
376
				}, executor);
377
	}
378

    
379
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
380
		try {
381
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
382
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
383
			}
384
		} catch (final InterruptedException e) {
385
			errors.offer(e);
386
		}
387
	}
388

    
389
}
(2-2/2)