Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.io.IOException;
4
import java.util.Calendar;
5
import java.util.Map;
6
import java.util.Queue;
7
import java.util.concurrent.*;
8
import javax.annotation.PostConstruct;
9
import javax.annotation.PreDestroy;
10

    
11
import com.google.common.collect.Iterables;
12
import com.google.common.util.concurrent.*;
13
import eu.dnetlib.OpenaireExporterConfig;
14
import eu.dnetlib.miscutils.functional.hash.Hashing;
15
import eu.dnetlib.openaire.exporter.datasource.ApiException;
16
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
17
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
18
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.lang.time.DateFormatUtils;
21
import org.apache.commons.lang3.exception.ExceptionUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.apache.solr.client.solrj.SolrClient;
25
import org.apache.solr.client.solrj.SolrQuery;
26
import org.apache.solr.client.solrj.SolrServerException;
27
import org.apache.solr.client.solrj.impl.CloudSolrClient;
28
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
29
import org.apache.solr.client.solrj.response.QueryResponse;
30
import org.apache.solr.common.SolrDocument;
31
import org.springframework.beans.factory.annotation.Autowired;
32
import org.springframework.cache.annotation.Cacheable;
33
import org.springframework.http.HttpStatus;
34
import org.springframework.stereotype.Component;
35

    
36
/**
37
 * Created by claudio on 20/10/2016.
38
 */
39
@Component
40
public class DatasourceIndexClient {
41

    
42
	private static final Log log = LogFactory.getLog(DatasourceIndexClient.class);
43

    
44
	public static final String SEPARATOR = "::";
45
	public static final String DSVERSION = "__dsversion";
46

    
47
	@Autowired
48
	private OpenaireExporterConfig config;
49

    
50
	private ListeningExecutorService executor;
51

    
52
	private static Map<String, SolrClient> indexClientMap = new ConcurrentHashMap<>();
53

    
54
	@PostConstruct
55
	public void init() {
56
		executor = MoreExecutors.listeningDecorator(
57
				new ScheduledThreadPoolExecutor(2,
58
						new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build()));
59
	}
60

    
61
	@PreDestroy
62
	public void tearDown() {
63
		indexClientMap.forEach((name, client) -> {
64
			try {
65
				client.close();
66
			} catch (IOException e) {
67
				log.warn(String.format("unable to gracefully shutdown client for index %s", name));
68
			}
69
		});
70
	}
71

    
72
	public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws ApiException {
73
		try {
74
			final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
75
			final SolrClient indexClient = getIndexClient(info);
76
			final CountDownLatch latch = new CountDownLatch(2);
77
			final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
78

    
79
			Futures.addCallback(
80
				executor.submit(() -> setTotal(collectedFrom, indexClient)),
81
						new FutureCallback<IndexRecordsInfo>() {
82
							@Override
83
							public void onSuccess(final IndexRecordsInfo info) {
84
								indexRecordInfo
85
										.setTotal(info.getTotal())
86
										.setDate(info.getDate());
87
								latch.countDown();
88
							}
89

    
90
							@Override
91
							public void onFailure(final Throwable e) {
92
								log.error(ExceptionUtils.getStackTrace(e));
93
								errors.offer(e);
94
								latch.countDown();
95
							}
96
						});
97

    
98
			Futures.addCallback(
99
					executor.submit(() -> setFunded(collectedFrom, indexClient)),
100
					new FutureCallback<Long>() {
101
						@Override
102
						public void onSuccess(final Long numFound) {
103
							indexRecordInfo.setFunded(numFound);
104
							latch.countDown();
105
						}
106

    
107
						@Override
108
						public void onFailure(final Throwable e) {
109
							log.error(ExceptionUtils.getStackTrace(e));
110
							errors.offer(e);
111
							latch.countDown();
112
						}
113
					});
114

    
115
			waitLatch(latch, errors, config.getRequestTimeout());
116
			return indexRecordInfo;
117
		} catch (final Throwable e) {
118
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying publications from: " + dsId, e);
119
		}
120
	}
121

    
122
	private Long setFunded(
123
			final String collectedFrom,
124
			final SolrClient indexClient) throws ApiException {
125

    
126
		final SolrQuery query = new SolrQuery(
127
				String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom))
128
				.setRows(0);
129

    
130
		log.debug(String.format("query: %s", query));
131
		try {
132
			return indexClient.query(query).getResults().getNumFound();
133
		} catch (Throwable e) {
134
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
135
		}
136
	}
137

    
138
	private IndexRecordsInfo setTotal(
139
			final String collectedFrom,
140
			final SolrClient indexClient) throws ApiException {
141
		try {
142
			final SolrQuery query = new SolrQuery(
143
					String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom))
144
					.setRows(1);
145
			log.debug(String.format("query: %s", query));
146

    
147
			final QueryResponse rsp = indexClient.query(query);
148
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
149
			if (log.isDebugEnabled()) {
150
				log.debug(String.format("got document %s", doc.toString()));
151
			}
152

    
153
			return new IndexRecordsInfo()
154
					.setDate(getLastIndexingDate(indexClient))
155
					.setTotal(rsp.getResults().getNumFound());
156
		} catch (Throwable e) {
157
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
158
		}
159
	}
160

    
161
	@Cacheable("index-cache")
162
	public String getLastIndexingDate(final SolrClient indexClient) throws ApiException {
163
		try {
164
			final QueryResponse rsp = indexClient.query(new SolrQuery("oaftype:datasource").setRows(1));
165
			if (rsp.getResults().getNumFound() > 0) {
166
				final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
167

    
168
				final String dsversion = doc.get(DSVERSION).toString();
169
				return StringUtils.substringBefore(dsversion, "T");
170
			} else {
171
				final String defaultDate = getDefaultLastIndexingDate();
172
				log.debug("unable to find documents, defaulting to " + defaultDate);
173
				return defaultDate;
174
			}
175
		} catch (SolrServerException | IOException e) {
176
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile", e);
177
		}
178
	}
179

    
180
	private String getDefaultLastIndexingDate() {
181
		final Calendar cal = Calendar.getInstance();
182
		cal.add(Calendar.MONTH, -1);
183
		return DateFormatUtils.format(cal.getTime(), DatasourceFunctions.DATE_FORMAT);
184
	}
185

    
186
	private synchronized SolrClient getIndexClient(final IndexDsInfo info) {
187
		if (!indexClientMap.containsKey(info.getColl())) {
188

    
189
			final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
190
			client.setDefaultCollection(info.getColl());
191

    
192
			indexClientMap.put(info.getColl(), client);
193
		}
194
		return indexClientMap.get(info.getColl());
195
	}
196

    
197
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
198
		try {
199
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
200
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
201
			}
202
		} catch (final InterruptedException e) {
203
			errors.offer(e);
204
		}
205
	}
206

    
207
}
(3-3/6)