Project

General

Profile

1
package eu.dnetlib.datasource.publisher.clients;
2

    
3
import java.util.List;
4
import java.util.Queue;
5
import java.util.concurrent.*;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.collect.Lists;
9
import com.google.common.util.concurrent.*;
10
import eu.dnetlib.datasource.publisher.ApiException;
11
import eu.dnetlib.datasource.publisher.clients.utils.IndexDsInfo;
12
import eu.dnetlib.datasource.publisher.clients.utils.IndexRecordsInfo;
13
import eu.dnetlib.datasource.publisher.model.AggregationInfo;
14
import eu.dnetlib.datasource.publisher.model.AggregationStage;
15
import eu.dnetlib.datasource.publisher.model.DatasourceResponse;
16
import eu.dnetlib.datasource.publisher.model.db.*;
17
import eu.dnetlib.datasource.publisher.model.db.SearchInterfacesEntry;
18
import eu.dnetlib.datasource.publisher.repository.*;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.lang3.exception.ExceptionUtils;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23
import org.apache.http.HttpStatus;
24
import org.springframework.beans.factory.annotation.Autowired;
25
import org.springframework.beans.factory.annotation.Value;
26
import org.springframework.data.domain.Pageable;
27
import org.springframework.data.domain.Slice;
28
import org.springframework.util.concurrent.ListenableFuture;
29
import org.springframework.util.concurrent.ListenableFutureCallback;
30

    
31
/**
32
 * Created by claudio on 20/10/2016.
33
 */
34
public class DatasourceDao {
35

    
36
	private static final Log log = LogFactory.getLog(DatasourceDao.class);
37

    
38
	@Autowired
39
	private MongoLoggerClient mongoLoggerClient;
40

    
41
	@Autowired
42
	private DatasourceIndexClient datasourceIndexClient;
43

    
44
	@Autowired
45
	private DatasourceRepository dsRepository;
46

    
47
	@Autowired
48
	private SearchInterfaceRepository searchInterfaceRepository;
49

    
50
	@Autowired
51
	private CountryTermRepository countryTermRepository;
52

    
53
	@Autowired
54
	private TypologyTermRepository typologyTermRepository;
55

    
56
	@Autowired
57
	private ProtocolTermRepository protocolTermRepository;
58

    
59
	@Autowired
60
	private CompatibilityTermRepository compatibilityTermRepository;
61

    
62
	@Autowired
63
	private ActivationTermRepository activationTermRepository;
64

    
65
	@Autowired
66
	private ApiRepository apiRepository;
67

    
68
	@Resource(name = "datasourceIsLookupClient")
69
	private ISLookupClient isLookupClient;
70

    
71
	private ListeningExecutorService service;
72

    
73
	private final static int WORKERS = 100;
74

    
75
	@Value("${datasource.publisher.timeout}")
76
	private int timeout = 10;
77

    
78
	public DatasourceDao() {
79
		service = MoreExecutors.listeningDecorator(
80
				new ScheduledThreadPoolExecutor(WORKERS,
81
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
82
	}
83

    
84
	public List<String> listIds(final Pageable pageable) throws ApiException {
85
		return dsRepository.findAll(pageable)
86
				.map(d -> d.getId())
87
				.getContent();
88
	}
89

    
90
	public List<DatasourceResponse> searchByName(final String name, final Pageable pageable) {
91

    
92
		final List<DatasourceResponse> datasourceResponse = Lists.newArrayList();
93

    
94
		final CountDownLatch outerLatch = new CountDownLatch(3);
95
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
96

    
97
		log.debug(String.format("search ds by name '%s'", name));
98
		final ListenableFuture<Slice<Datasource>> c =
99
				dsRepository.findByOfficialnameContainingOrEnglishnameContainingAllIgnoreCase(name, name, pageable);
100
		c.addCallback(getSearchCallback(outerLatch, errors, datasourceResponse));
101

    
102
		waitLatch(outerLatch, errors, timeout);
103

    
104
		return datasourceResponse;
105
	}
106

    
107
	public List<DatasourceResponse> searchByCountry(final String country, final Pageable pageable) {
108

    
109
		final List<DatasourceResponse> datasourceResponse = Lists.newArrayList();
110

    
111
		final CountDownLatch outerLatch = new CountDownLatch(3);
112
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
113

    
114
		log.debug(String.format("search ds by country '%s'", country));
115
		//dsRepository.findByOrganizationsCountryIgnoreCase(country, pageable).addCallback(getSearchCallback(outerLatch, errors, datasourceResponse));
116

    
117
		waitLatch(outerLatch, errors, timeout);
118

    
119
		return datasourceResponse;
120
	}
121

    
122
	public List<DatasourceResponse> searchByContactemail(final String email, final Pageable pageable) {
123
		final List<DatasourceResponse> datasourceResponse = Lists.newArrayList();
124

    
125
		final CountDownLatch outerLatch = new CountDownLatch(3);
126
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
127

    
128
		log.debug(String.format("search ds by email '%s'", email));
129
		dsRepository.findByContactemailContainingAllIgnoreCase(email, pageable).addCallback(getSearchCallback(outerLatch, errors, datasourceResponse));
130

    
131
		waitLatch(outerLatch, errors, timeout);
132

    
133
		return datasourceResponse;
134
	}
135

    
136
	private ListenableFutureCallback<Slice<Datasource>> getSearchCallback(final CountDownLatch outerLatch,
137
			final Queue<Throwable> errors,
138
			final List<DatasourceResponse> datasourceResponse) {
139

    
140
		return new ListenableFutureCallback<Slice<Datasource>>() {
141

    
142
			@Override
143
			public void onSuccess(final Slice<Datasource> datasources) {
144
				datasources.forEach(d -> {
145
					final DatasourceResponse response = new DatasourceResponse();
146
					response.setDatasource(d);
147
					getAggregationHistory(d.getId(), outerLatch, errors, response);
148
					getIndexDsInfo(d.getId(), outerLatch, errors, response);
149
					datasourceResponse.add(response);
150
				});
151
				outerLatch.countDown();
152
			}
153

    
154
			@Override
155
			public void onFailure(final Throwable e) {
156
				errors.offer(e);
157
				outerLatch.countDown();
158
			}
159
		};
160
	}
161

    
162
	public ClientResponse getInfo(final String dsId) {
163

    
164
		final CountDownLatch outerLatch = new CountDownLatch(3);
165
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
166
		final DatasourceResponse datasourceResponse = new DatasourceResponse();
167

    
168
		getAggregationHistory(dsId, outerLatch, errors, datasourceResponse);
169

    
170
		dsRepository.findOneById(dsId).addCallback(new ListenableFutureCallback<Datasource>() {
171
			@Override
172
			public void onSuccess(final Datasource datasource) {
173
				datasourceResponse.setDatasource(datasource);
174
				outerLatch.countDown();
175
			}
176

    
177
			@Override
178
			public void onFailure(final Throwable e) {
179
				log.error(ExceptionUtils.getStackTrace(e));
180
				errors.offer(e);
181
				outerLatch.countDown();
182
			}
183
		});
184

    
185
		getIndexDsInfo(dsId, outerLatch, errors, datasourceResponse);
186

    
187
		waitLatch(outerLatch, errors, timeout);
188

    
189
		/*
190
		if (!errors.isEmpty()) {
191
			datasourceResponse.getResponseHeader().setError(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> e.getMessage()).collect(Collectors.toList())));
192
			log.error(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> ExceptionUtils.getFullStackTrace(e)).collect(Collectors.toList())));
193
		}
194
		*/
195

    
196
		return new ClientResponse().datasourceInfo(datasourceResponse).errors(errors);
197
	}
198

    
199
	public void setManaged(final String id, final boolean managed) {
200
		log.info(String.format("setting managed = '%s' for ds '%s'", managed, id));
201
		dsRepository.setManaged(id, managed);
202
	}
203

    
204
	public List<SearchInterfacesEntry> searchInterface(final String field, final String value) {
205
		switch (field) {
206
		case "__SEARCH__":
207
			return searchInterfaceRepository
208
					.findByRepoidContainingOrRepoNameContainingOrAlternativeNameContainingOrRepoPrefixContainingOrRepoOrganizationContainingAllIgnoreCase(
209
							value, value, value, value, value);
210
		case "country":
211
			break;
212
		case "type":
213
			break;
214
		case "protocol":
215
			break;
216
		case "compliance":
217
			break;
218
		case "active":
219
			break;
220
		default:
221
			throw new IllegalArgumentException("");
222
		}
223
		return null;
224
	}
225

    
226
	public List<CountryTerm> browseCountries() {
227
		return countryTermRepository.findAll();
228
	}
229

    
230
	public List<TypologyTerm> browseTypologies() {
231
		return typologyTermRepository.findAll();
232
	}
233

    
234
	public List<ProtocolTerm> browseProtocols() {
235
		return protocolTermRepository.findAll();
236
	}
237

    
238
	public List<CompatibilityTerm> browseCompatibility() {
239
		return compatibilityTermRepository.findAll();
240
	}
241

    
242
	public List<ActivationTerm> browseActivation() {
243
		return activationTermRepository.findAll();
244
	}
245

    
246
	public List<Api> getApi(final String dsId) {
247
		return apiRepository.findByDatasource(dsId);
248
	}
249

    
250
	public void deleteApi(final String apiId) {
251
		apiRepository.delete(apiId);
252
		log.info(String.format("deleted api '%s'", apiId));
253
	}
254

    
255
	public void addApi(final Api api) {
256

    
257
		if (StringUtils.isBlank(api.getId())) {
258
			api.setId(ApiRepository.createId(api));
259
			log.info(String.format("missing api id, created '%s'"));
260
		}
261

    
262
		apiRepository.save(api);
263
	}
264

    
265
	public boolean exist(final Datasource d) throws ApiException {
266
		try {
267
			return dsRepository.findOneById(d.getId()).get() != null;
268
		} catch (Exception e) {
269
			log.error(e);
270
			throw new ApiException(HttpStatus.SC_INTERNAL_SERVER_ERROR, String.format("error retrieving datasource information '%s'", d.getId()), e);
271
		}
272
	}
273

    
274
	public Datasource save(final Datasource d) {
275
		log.info(String.format("saving datasource '%s'", d.getId()));
276
		final Datasource datasource = dsRepository.save(d);
277
		log.info(String.format("saved datasource '%s'", datasource.getId()));
278
		return datasource;
279
	}
280

    
281
	public void updateOfficialName(final String dsId, final String officialname) {
282
		dsRepository.setOfficialname(dsId, officialname);
283
		log.info(String.format("updated datasource '%s' with officialname '%s'", dsId, officialname));
284
	}
285

    
286
	public void updateEnglishName(final String dsId, final String englishname) {
287
		dsRepository.setEnglishname(dsId, englishname);
288
		log.info(String.format("updated datasource '%s' with englishname '%s'", dsId, englishname));
289
	}
290

    
291
	public void updateLatitude(final String dsId, final Double latitude) {
292
		dsRepository.setLatitude(dsId, latitude);
293
		log.info(String.format("updated datasource '%s' with latitude '%s'", dsId, latitude));
294
	}
295

    
296
	public void updateLongitude(final String dsId, final Double longitude) {
297
		dsRepository.setLongitude(dsId, longitude);
298
		log.info(String.format("updated datasource '%s' with longitude '%s'", dsId, longitude));
299
	}
300

    
301
	private void getIndexDsInfo(final String dsId,
302
			final CountDownLatch outerLatch,
303
			final Queue<Throwable> errors,
304
			final DatasourceResponse datasourceResponse) {
305
		Futures.addCallback(
306
				service.submit(() -> isLookupClient.calculateCurrentIndexDsInfo()),
307
				new FutureCallback<IndexDsInfo>() {
308

    
309
					public void onSuccess(final IndexDsInfo info) {
310

    
311
						final CountDownLatch innerLatch = new CountDownLatch(1);
312

    
313
						Futures.addCallback(
314
								service.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info)),
315
								new FutureCallback<IndexRecordsInfo>() {
316
									public void onSuccess(IndexRecordsInfo info) {
317
										datasourceResponse.setIndexRecords(info.getCount()).setLastIndexingDate(info.getDate());
318
										innerLatch.countDown();
319
									}
320
									public void onFailure(Throwable e) {
321
										errors.offer(e);
322
										innerLatch.countDown();
323
									}
324
						});
325
						waitLatch(innerLatch, errors, timeout);
326

    
327
						outerLatch.countDown();
328
					}
329

    
330
					public void onFailure(final Throwable e) {
331
						log.error(ExceptionUtils.getStackTrace(e));
332
						errors.offer(e);
333
						outerLatch.countDown();
334
					}
335
		});
336
	}
337

    
338
	private void getAggregationHistory(final String dsId,
339
			final CountDownLatch outerLatch,
340
			final Queue<Throwable> errors,
341
			final DatasourceResponse datasourceResponse) {
342
		Futures.addCallback(
343
				service.submit(() -> mongoLoggerClient.getAggregationHistory(dsId)),
344
				new FutureCallback<List<AggregationInfo>>() {
345
					public void onSuccess(List<AggregationInfo> info) {
346
						setAggregationHistory(datasourceResponse, info);
347
						outerLatch.countDown();
348
					}
349
					public void onFailure(Throwable e) {
350
						log.error(ExceptionUtils.getStackTrace(e));
351
						errors.offer(e);
352
						outerLatch.countDown();
353
					}
354
		});
355
	}
356

    
357
	private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {
358
		datasourceResponse.setAggregationHistory(info);
359
		if (!info.isEmpty()) {
360
			datasourceResponse
361
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
362
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
363
		}
364
	}
365

    
366
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
367
		try {
368
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
369
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
370
			}
371
		} catch (final InterruptedException e) {
372
			errors.offer(e);
373
		}
374
	}
375

    
376
}
(2-2/7)