Project

General

Profile

1
package eu.dnetlib.datasource.publisher.clients;
2

    
3
import java.util.List;
4
import java.util.Queue;
5
import java.util.concurrent.*;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.util.concurrent.*;
9
import eu.dnetlib.datasource.publisher.ApiException;
10
import eu.dnetlib.datasource.publisher.clients.utils.IndexDsInfo;
11
import eu.dnetlib.datasource.publisher.clients.utils.IndexRecordsInfo;
12
import eu.dnetlib.datasource.publisher.model.AggregationInfo;
13
import eu.dnetlib.datasource.publisher.model.AggregationStage;
14
import eu.dnetlib.datasource.publisher.model.DatasourceResponse;
15
import eu.dnetlib.datasource.publisher.model.db.Datasource;
16
import eu.dnetlib.datasource.publisher.model.db.SearchInterfacesEntry;
17
import eu.dnetlib.datasource.publisher.repository.DatasourceRepository;
18
import eu.dnetlib.datasource.publisher.repository.SearchApiRepository;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.springframework.beans.factory.annotation.Autowired;
22
import org.springframework.beans.factory.annotation.Value;
23
import org.springframework.data.domain.Pageable;
24
import org.springframework.util.concurrent.ListenableFutureCallback;
25

    
26
/**
27
 * Created by claudio on 20/10/2016.
28
 */
29
public class DatasourceInfoRetriever {
30

    
31
	private static final Log log = LogFactory.getLog(DatasourceInfoRetriever.class);
32

    
33
	@Autowired
34
	private MongoLoggerClient mongoLoggerClient;
35

    
36
	@Autowired
37
	private DatasourceIndexClient datasourceIndexClient;
38

    
39
	@Autowired
40
	private DatasourceRepository dsRepository;
41

    
42
	@Autowired
43
	private SearchApiRepository searchApiRepository;
44

    
45
	@Resource(name = "datasourceIsLookupClient")
46
	private ISLookupClient isLookupClient;
47

    
48
	private ListeningExecutorService service;
49

    
50
	private final static int WORKERS = 100;
51

    
52
	@Value("${datasource.publisher.timeout}")
53
	private int timeout = 10;
54

    
55
	public DatasourceInfoRetriever() {
56
		service = MoreExecutors.listeningDecorator(
57
				new ScheduledThreadPoolExecutor(WORKERS,
58
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
59
	}
60

    
61
	public List<String> listIds(final Pageable pageable) throws ApiException {
62
		return dsRepository.findAll(pageable)
63
				.map(d -> d.getId())
64
				.getContent();
65
	}
66

    
67
	public ClientResponse getInfo(final String dsId) {
68

    
69
		final CountDownLatch outerLatch = new CountDownLatch(3);
70
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
71
		final DatasourceResponse datasourceResponse = new DatasourceResponse();
72

    
73
		Futures.addCallback(getAggregationHistory(dsId), new FutureCallback<List<AggregationInfo>>() {
74
			public void onSuccess(List<AggregationInfo> info) {
75
				setAggregationHistory(datasourceResponse, info);
76
				outerLatch.countDown();
77
			}
78
			public void onFailure(Throwable e) {
79
				errors.offer(e);
80
				outerLatch.countDown();
81
			}
82
		});
83

    
84
		dsRepository.findOneById(dsId).addCallback(new ListenableFutureCallback<Datasource>() {
85
			@Override
86
			public void onSuccess(final Datasource datasource) {
87
				datasourceResponse.setDatasource(datasource);
88
				outerLatch.countDown();
89
			}
90

    
91
			@Override
92
			public void onFailure(final Throwable e) {
93
				errors.offer(e);
94
				outerLatch.countDown();
95
			}
96
		});
97

    
98
		Futures.addCallback(calculateCurrentIndexDsInfo(), new FutureCallback<IndexDsInfo>() {
99

    
100
			public void onSuccess(final IndexDsInfo info) {
101

    
102
				final CountDownLatch innerLatch = new CountDownLatch(1);
103

    
104
				Futures.addCallback(getIndexInfo(dsId, info), new FutureCallback<IndexRecordsInfo>() {
105
					public void onSuccess(IndexRecordsInfo info) {
106
						datasourceResponse.setIndexRecords(info.getCount()).setLastIndexingDate(info.getDate());
107
						innerLatch.countDown();
108
					}
109
					public void onFailure(Throwable e) {
110
						errors.offer(e);
111
						innerLatch.countDown();
112
					}
113
				});
114
				waitLatch(innerLatch, errors, timeout);
115

    
116
				outerLatch.countDown();
117
			}
118

    
119
			public void onFailure(final Throwable e) {
120
				errors.offer(e);
121
				outerLatch.countDown();
122
			}
123
		});
124

    
125
		waitLatch(outerLatch, errors, timeout);
126

    
127
		/*
128
		if (!errors.isEmpty()) {
129
			datasourceResponse.getResponseHeader().setError(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> e.getMessage()).collect(Collectors.toList())));
130
			log.error(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> ExceptionUtils.getFullStackTrace(e)).collect(Collectors.toList())));
131
		}
132
		*/
133

    
134
		return new ClientResponse().datasourceInfo(datasourceResponse).errors(errors);
135
	}
136

    
137
	public List<SearchInterfacesEntry> searchInterface(final String field, final String value) {
138
		switch (field) {
139
		case "__SEARCH__":
140
			return searchApiRepository
141
					.findByRepoidContainingOrRepoNameContainingOrAlternativeNameContainingOrRepoPrefixContainingOrRepoOrganizationContainingAllIgnoreCase(
142
							value, value, value, value, value);
143
		case "country":
144
			break;
145
		case "type":
146
			break;
147
		case "protocol":
148
			break;
149
		case "compliance":
150
			break;
151
		case "active":
152
			break;
153
		default:
154
			throw new IllegalArgumentException("");
155
		}
156
		return null;
157
	}
158

    
159
	private ListenableFuture<IndexDsInfo> calculateCurrentIndexDsInfo() {
160
		return service.submit(() -> isLookupClient.calculateCurrentIndexDsInfo());
161
	}
162

    
163
	private ListenableFuture<IndexRecordsInfo> getIndexInfo(final String dsId, final IndexDsInfo info) {
164
		return service.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info));
165
	}
166

    
167
	private ListenableFuture<List<AggregationInfo>> getAggregationHistory(final String dsId) {
168
		return service.submit(() -> mongoLoggerClient.getAggregationHistory(dsId));
169
	}
170

    
171
	private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {
172
		datasourceResponse.setAggregationHistory(info);
173
		if (!info.isEmpty()) {
174
			datasourceResponse
175
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
176
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
177
		}
178
	}
179

    
180
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
181
		try {
182
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
183
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
184
			}
185
		} catch (final InterruptedException e) {
186
			errors.offer(e);
187
		}
188
	}
189

    
190
}
(3-3/7)