Project

General

Profile

1 44690 claudio.at
package eu.dnetlib.datasource.publisher.clients;
2 44275 claudio.at
3
import java.util.List;
4 44690 claudio.at
import java.util.Queue;
5
import java.util.concurrent.*;
6 46876 claudio.at
import javax.annotation.Resource;
7 44275 claudio.at
8 44690 claudio.at
import com.google.common.util.concurrent.*;
9 44743 claudio.at
import eu.dnetlib.datasource.publisher.ApiException;
10 44765 claudio.at
import eu.dnetlib.datasource.publisher.clients.utils.IndexDsInfo;
11
import eu.dnetlib.datasource.publisher.clients.utils.IndexRecordsInfo;
12 46876 claudio.at
import eu.dnetlib.datasource.publisher.model.AggregationInfo;
13
import eu.dnetlib.datasource.publisher.model.AggregationStage;
14
import eu.dnetlib.datasource.publisher.model.DatasourceResponse;
15
import eu.dnetlib.datasource.publisher.model.db.Datasource;
16
import eu.dnetlib.datasource.publisher.model.db.SearchInterfacesEntry;
17
import eu.dnetlib.datasource.publisher.repository.DatasourceRepository;
18
import eu.dnetlib.datasource.publisher.repository.SearchApiRepository;
19 44690 claudio.at
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21 44275 claudio.at
import org.springframework.beans.factory.annotation.Autowired;
22 46718 claudio.at
import org.springframework.beans.factory.annotation.Value;
23 46876 claudio.at
import org.springframework.data.domain.Pageable;
24
import org.springframework.util.concurrent.ListenableFutureCallback;
25 44275 claudio.at
26
/**
27
 * Created by claudio on 20/10/2016.
28
 */
29
public class DatasourceInfoRetriever {
30
31 44690 claudio.at
	private static final Log log = LogFactory.getLog(DatasourceInfoRetriever.class);
32
33 44381 claudio.at
	@Autowired
34
	private MongoLoggerClient mongoLoggerClient;
35 44275 claudio.at
36
	@Autowired
37 44381 claudio.at
	private DatasourceIndexClient datasourceIndexClient;
38 44275 claudio.at
39
	@Autowired
40 46876 claudio.at
	private DatasourceRepository dsRepository;
41 44275 claudio.at
42 44690 claudio.at
	@Autowired
43 46876 claudio.at
	private SearchApiRepository searchApiRepository;
44 44690 claudio.at
45 46876 claudio.at
	@Resource(name = "datasourceIsLookupClient")
46
	private ISLookupClient isLookupClient;
47
48 44690 claudio.at
	private ListeningExecutorService service;
49
50
	private final static int WORKERS = 100;
51
52 46718 claudio.at
	@Value("${datasource.publisher.timeout}")
53
	private int timeout = 10;
54
55 44690 claudio.at
	public DatasourceInfoRetriever() {
56
		service = MoreExecutors.listeningDecorator(
57
				new ScheduledThreadPoolExecutor(WORKERS,
58
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
59
	}
60
61 46876 claudio.at
	public List<String> listIds(final Pageable pageable) throws ApiException {
62
		return dsRepository.findAll(pageable)
63
				.map(d -> d.getId())
64
				.getContent();
65 44743 claudio.at
	}
66 44690 claudio.at
67 44743 claudio.at
	public ClientResponse getInfo(final String dsId) {
68
69 44690 claudio.at
		final CountDownLatch outerLatch = new CountDownLatch(3);
70
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
71 44765 claudio.at
		final DatasourceResponse datasourceResponse = new DatasourceResponse();
72 44275 claudio.at
73 44690 claudio.at
		Futures.addCallback(getAggregationHistory(dsId), new FutureCallback<List<AggregationInfo>>() {
74
			public void onSuccess(List<AggregationInfo> info) {
75 44765 claudio.at
				setAggregationHistory(datasourceResponse, info);
76 44690 claudio.at
				outerLatch.countDown();
77
			}
78
			public void onFailure(Throwable e) {
79
				errors.offer(e);
80
				outerLatch.countDown();
81
			}
82
		});
83 44275 claudio.at
84 46876 claudio.at
		dsRepository.findOneById(dsId).addCallback(new ListenableFutureCallback<Datasource>() {
85 44690 claudio.at
			@Override
86
			public void onSuccess(final Datasource datasource) {
87 44765 claudio.at
				datasourceResponse.setDatasource(datasource);
88 44690 claudio.at
				outerLatch.countDown();
89
			}
90 44381 claudio.at
91 44690 claudio.at
			@Override
92
			public void onFailure(final Throwable e) {
93
				errors.offer(e);
94
				outerLatch.countDown();
95
			}
96
		});
97
98
		Futures.addCallback(calculateCurrentIndexDsInfo(), new FutureCallback<IndexDsInfo>() {
99
100
			public void onSuccess(final IndexDsInfo info) {
101
102
				final CountDownLatch innerLatch = new CountDownLatch(1);
103
104
				Futures.addCallback(getIndexInfo(dsId, info), new FutureCallback<IndexRecordsInfo>() {
105
					public void onSuccess(IndexRecordsInfo info) {
106 44765 claudio.at
						datasourceResponse.setIndexRecords(info.getCount()).setLastIndexingDate(info.getDate());
107 44690 claudio.at
						innerLatch.countDown();
108
					}
109
					public void onFailure(Throwable e) {
110
						errors.offer(e);
111
						innerLatch.countDown();
112
					}
113
				});
114 46718 claudio.at
				waitLatch(innerLatch, errors, timeout);
115 44690 claudio.at
116
				outerLatch.countDown();
117
			}
118
119
			public void onFailure(final Throwable e) {
120
				errors.offer(e);
121
				outerLatch.countDown();
122
			}
123
		});
124
125 46718 claudio.at
		waitLatch(outerLatch, errors, timeout);
126 44690 claudio.at
127 46876 claudio.at
		/*
128 44690 claudio.at
		if (!errors.isEmpty()) {
129 46876 claudio.at
			datasourceResponse.getResponseHeader().setError(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> e.getMessage()).collect(Collectors.toList())));
130
			log.error(Joiner.on("\n").skipNulls().join(errors.stream().map(e -> ExceptionUtils.getFullStackTrace(e)).collect(Collectors.toList())));
131 44690 claudio.at
		}
132 46876 claudio.at
		*/
133 44690 claudio.at
134 44765 claudio.at
		return new ClientResponse().datasourceInfo(datasourceResponse).errors(errors);
135 44690 claudio.at
	}
136
137 46876 claudio.at
	public List<SearchInterfacesEntry> searchInterface(final String field, final String value) {
138
		switch (field) {
139
		case "__SEARCH__":
140
			return searchApiRepository
141
					.findByRepoidContainingOrRepoNameContainingOrAlternativeNameContainingOrRepoPrefixContainingOrRepoOrganizationContainingAllIgnoreCase(
142
							value, value, value, value, value);
143
		case "country":
144
			break;
145
		case "type":
146
			break;
147
		case "protocol":
148
			break;
149
		case "compliance":
150
			break;
151
		case "active":
152
			break;
153
		default:
154
			throw new IllegalArgumentException("");
155
		}
156
		return null;
157 44690 claudio.at
	}
158
159
	private ListenableFuture<IndexDsInfo> calculateCurrentIndexDsInfo() {
160 46876 claudio.at
		return service.submit(() -> isLookupClient.calculateCurrentIndexDsInfo());
161 44690 claudio.at
	}
162
163
	private ListenableFuture<IndexRecordsInfo> getIndexInfo(final String dsId, final IndexDsInfo info) {
164 46876 claudio.at
		return service.submit(() -> datasourceIndexClient.getIndexInfo(dsId, info));
165 44690 claudio.at
	}
166
167
	private ListenableFuture<List<AggregationInfo>> getAggregationHistory(final String dsId) {
168 46876 claudio.at
		return service.submit(() -> mongoLoggerClient.getAggregationHistory(dsId));
169 44690 claudio.at
	}
170
171 44765 claudio.at
	private void setAggregationHistory(DatasourceResponse datasourceResponse, final List<AggregationInfo> info) {
172
		datasourceResponse.setAggregationHistory(info);
173 44690 claudio.at
		if (!info.isEmpty()) {
174 46876 claudio.at
			datasourceResponse
175
					.setLastCollection(info.stream().filter(a -> AggregationStage.COLLECT.equals(a.getAggregationStage())).findFirst().get())
176
					.setLastTransformation(info.stream().filter(a -> AggregationStage.TRANSFORM.equals(a.getAggregationStage())).findFirst().get());
177 44275 claudio.at
		}
178
	}
179
180 44690 claudio.at
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
181
		try {
182
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
183
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
184
			}
185
		} catch (final InterruptedException e) {
186
			errors.offer(e);
187
		}
188
	}
189
190 44275 claudio.at
}