Project

General

Profile

1
package eu.dnetlib.datasource.publisher.clients;
2

    
3
import java.util.List;
4
import java.util.Queue;
5
import java.util.concurrent.*;
6

    
7
import com.google.common.base.Function;
8
import com.google.common.base.Joiner;
9
import com.google.common.base.Predicate;
10
import com.google.common.collect.Iterables;
11
import com.google.common.util.concurrent.*;
12
import eu.dnetlib.datasource.publisher.DatasourceException;
13
import eu.dnetlib.datasource.publisher.model.*;
14
import org.apache.commons.lang.exception.ExceptionUtils;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.cxf.common.i18n.Exception;
18
import org.springframework.beans.factory.annotation.Autowired;
19

    
20
/**
21
 * Created by claudio on 20/10/2016.
22
 */
23
public class DatasourceInfoRetriever {
24

    
25
	private static final Log log = LogFactory.getLog(DatasourceInfoRetriever.class);
26

    
27
	@Autowired
28
	private MongoLoggerClient mongoLoggerClient;
29

    
30
	@Autowired
31
	private DatasourceIndexClient datasourceIndexClient;
32

    
33
	@Autowired
34
	private ISLookupClient lookupClient;
35

    
36
	@Autowired
37
	private JdbcDatasourceDao jdbcDatasourceDao;
38

    
39
	private ListeningExecutorService service;
40

    
41
	private final static int WORKERS = 100;
42

    
43
	public DatasourceInfoRetriever() {
44
		service = MoreExecutors.listeningDecorator(
45
				new ScheduledThreadPoolExecutor(WORKERS,
46
						new ThreadFactoryBuilder().setNameFormat("datasource-info-retriever-%d").build()));
47
	}
48

    
49
	public DatasourceInfo getInfo(final String dsId) throws DatasourceException {
50

    
51
		final CountDownLatch outerLatch = new CountDownLatch(3);
52
		final Queue<Throwable> errors = new ConcurrentLinkedQueue<>();
53
		final DatasourceInfo datasourceInfo = new DatasourceInfo();
54

    
55
		Futures.addCallback(getAggregationHistory(dsId), new FutureCallback<List<AggregationInfo>>() {
56
			public void onSuccess(List<AggregationInfo> info) {
57
				setAggregationHistory(datasourceInfo, info);
58
				outerLatch.countDown();
59
			}
60
			public void onFailure(Throwable e) {
61
				errors.offer(e);
62
				outerLatch.countDown();
63
			}
64
		});
65

    
66
		Futures.addCallback(getDatasourceDetails(dsId), new FutureCallback<Datasource>() {
67
			@Override
68
			public void onSuccess(final Datasource datasource) {
69
				datasourceInfo.setDatasource(datasource);
70
				outerLatch.countDown();
71
			}
72

    
73
			@Override
74
			public void onFailure(final Throwable e) {
75
				errors.offer(e);
76
				outerLatch.countDown();
77
			}
78
		});
79

    
80
		Futures.addCallback(calculateCurrentIndexDsInfo(), new FutureCallback<IndexDsInfo>() {
81

    
82
			public void onSuccess(final IndexDsInfo info) {
83

    
84
				final CountDownLatch innerLatch = new CountDownLatch(1);
85

    
86
				Futures.addCallback(getIndexInfo(dsId, info), new FutureCallback<IndexRecordsInfo>() {
87
					public void onSuccess(IndexRecordsInfo info) {
88
						datasourceInfo.setIndexRecords(info.getCount()).setLastIndexingDate(info.getDate());
89
						innerLatch.countDown();
90
					}
91
					public void onFailure(Throwable e) {
92
						errors.offer(e);
93
						innerLatch.countDown();
94
					}
95
				});
96
				waitLatch(innerLatch, errors, 10);
97

    
98
				outerLatch.countDown();
99
			}
100

    
101
			public void onFailure(final Throwable e) {
102
				errors.offer(e);
103
				outerLatch.countDown();
104
			}
105
		});
106

    
107
		waitLatch(outerLatch, errors, 10);
108

    
109
		if (!errors.isEmpty()) {
110
			datasourceInfo.getResponseHeader().setError(Joiner.on("\n").join(Iterables.transform(errors, new Function<Throwable, String>() {
111
				@Override
112
				public String apply(final Throwable e) {
113
					return e.getMessage();
114
				}
115
			})));
116
			log.error(Joiner.on("\n").join(Iterables.transform(errors, new Function<Throwable, String>() {
117
				@Override
118
				public String apply(final Throwable e) {
119
					return ExceptionUtils.getFullStackTrace(e);
120
				}
121
			})));
122
		}
123

    
124
		return datasourceInfo;
125
	}
126

    
127
	private ListenableFuture<Datasource> getDatasourceDetails(final String dsId) {
128
		return service.submit(new Callable<Datasource>() {
129
			@Override
130
			public Datasource call() throws DatasourceException {
131
				return jdbcDatasourceDao.getDatasource(dsId);
132
			}
133
		});
134
	}
135

    
136
	private ListenableFuture<IndexDsInfo> calculateCurrentIndexDsInfo() {
137
		return service.submit(new Callable<IndexDsInfo>() {
138
				public IndexDsInfo call() throws DatasourceException {
139
					return lookupClient.calculateCurrentIndexDsInfo();
140
				}
141
			});
142
	}
143

    
144
	private ListenableFuture<IndexRecordsInfo> getIndexInfo(final String dsId, final IndexDsInfo info) {
145
		return service.submit(new Callable<IndexRecordsInfo>() {
146
			public IndexRecordsInfo call() throws DatasourceException {
147
				return datasourceIndexClient.getIndexInfo(dsId, info);
148
			}
149
		});
150
	}
151

    
152
	private ListenableFuture<List<AggregationInfo>> getAggregationHistory(final String dsId) {
153
		return service.submit(new Callable<List<AggregationInfo>>() {
154
			public List<AggregationInfo> call() throws DatasourceException {
155
				return mongoLoggerClient.getAggregationHistory(dsId);
156
			}
157
		});
158
	}
159

    
160
	private void setAggregationHistory(DatasourceInfo datasourceInfo, final List<AggregationInfo> info) {
161
		datasourceInfo.setAggregationHistory(info);
162
		if (!info.isEmpty()) {
163
			datasourceInfo.setLastCollection(Iterables.find(info, new Predicate<AggregationInfo>() {
164
				@Override
165
				public boolean apply(final AggregationInfo a) {
166
					return AggregationStage.COLLECT.equals(a.getAggregationStage());
167
				}
168
			})).setLastTransformation(Iterables.find(info, new Predicate<AggregationInfo>() {
169
				@Override
170
				public boolean apply(final AggregationInfo a) {
171
					return AggregationStage.TRANSFORM.equals(a.getAggregationStage());
172
				}
173
			}));
174
		}
175
	}
176

    
177
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
178
		try {
179
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
180
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
181
			}
182
		} catch (final InterruptedException e) {
183
			errors.offer(e);
184
		}
185
	}
186

    
187
}
(2-2/6)