Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.io.IOException;
4
import java.util.Calendar;
5
import java.util.Date;
6
import java.util.Map;
7
import java.util.Queue;
8
import java.util.concurrent.*;
9
import javax.annotation.PostConstruct;
10
import javax.annotation.PreDestroy;
11

    
12
import com.google.common.collect.Iterables;
13
import com.google.common.util.concurrent.*;
14
import eu.dnetlib.OpenaireExporterConfig;
15
import eu.dnetlib.miscutils.functional.hash.Hashing;
16
import eu.dnetlib.openaire.exporter.datasource.ApiException;
17
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
18
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
19
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
20
import org.apache.commons.lang.StringUtils;
21
import org.apache.commons.lang.time.DateFormatUtils;
22
import org.apache.commons.lang3.exception.ExceptionUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.apache.solr.client.solrj.SolrClient;
26
import org.apache.solr.client.solrj.SolrQuery;
27
import org.apache.solr.client.solrj.SolrServerException;
28
import org.apache.solr.client.solrj.impl.CloudSolrClient;
29
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
30
import org.apache.solr.client.solrj.response.QueryResponse;
31
import org.apache.solr.common.SolrDocument;
32
import org.springframework.beans.factory.annotation.Autowired;
33
import org.springframework.cache.annotation.Cacheable;
34
import org.springframework.http.HttpStatus;
35
import org.springframework.stereotype.Component;
36

    
37
/**
38
 * Created by claudio on 20/10/2016.
39
 */
40
@Component
41
public class DatasourceIndexClient {
42

    
43
	private static final Log log = LogFactory.getLog(DatasourceIndexClient.class);
44

    
45
	public static final String SEPARATOR = "::";
46
	public static final String DSVERSION = "__dsversion";
47

    
48
	@Autowired
49
	private OpenaireExporterConfig config;
50

    
51
	private ListeningExecutorService executor;
52

    
53
	private static Map<String, SolrClient> indexClientMap = new ConcurrentHashMap<>();
54

    
55
	@PostConstruct
56
	public void init() {
57
		executor = MoreExecutors.listeningDecorator(
58
				new ScheduledThreadPoolExecutor(2,
59
						new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build()));
60
	}
61

    
62
	@PreDestroy
63
	public void tearDown() {
64
		indexClientMap.forEach((name, client) -> {
65
			try {
66
				client.close();
67
			} catch (IOException e) {
68
				log.warn(String.format("unable to gracefully shutdown client for index %s", name));
69
			}
70
		});
71
	}
72

    
73
	public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws ApiException {
74
		try {
75
			final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
76
			final SolrClient indexClient = getIndexClient(info);
77
			final CountDownLatch latch = new CountDownLatch(2);
78
			final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
79

    
80
			Futures.addCallback(
81
				executor.submit(() -> setTotal(collectedFrom, indexClient)),
82
						new FutureCallback<IndexRecordsInfo>() {
83
							@Override
84
							public void onSuccess(final IndexRecordsInfo info) {
85
								indexRecordInfo
86
										.setTotal(info.getTotal())
87
										.setDate(info.getDate());
88
								latch.countDown();
89
							}
90

    
91
							@Override
92
							public void onFailure(final Throwable e) {
93
								log.error(ExceptionUtils.getStackTrace(e));
94
								errors.offer(e);
95
								latch.countDown();
96
							}
97
						});
98

    
99
			Futures.addCallback(
100
					executor.submit(() -> setFunded(collectedFrom, indexClient)),
101
					new FutureCallback<Long>() {
102
						@Override
103
						public void onSuccess(final Long numFound) {
104
							indexRecordInfo.setFunded(numFound);
105
							latch.countDown();
106
						}
107

    
108
						@Override
109
						public void onFailure(final Throwable e) {
110
							log.error(ExceptionUtils.getStackTrace(e));
111
							errors.offer(e);
112
							latch.countDown();
113
						}
114
					});
115

    
116
			waitLatch(latch, errors, config.getRequestTimeout());
117
			return indexRecordInfo;
118
		} catch (final Throwable e) {
119
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying publications from: " + dsId, e);
120
		}
121
	}
122

    
123
	private Long setFunded(
124
			final String collectedFrom,
125
			final SolrClient indexClient) throws ApiException {
126

    
127
		final SolrQuery query = new SolrQuery(
128
				String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom))
129
				.setRows(0);
130

    
131
		log.debug(String.format("query: %s", query));
132
		try {
133
			return indexClient.query(query).getResults().getNumFound();
134
		} catch (Throwable e) {
135
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
136
		}
137
	}
138

    
139
	private IndexRecordsInfo setTotal(
140
			final String collectedFrom,
141
			final SolrClient indexClient) throws ApiException {
142
		try {
143
			final SolrQuery query = new SolrQuery(
144
					String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom))
145
					.setRows(1);
146
			log.debug(String.format("query: %s", query));
147

    
148
			final QueryResponse rsp = indexClient.query(query);
149
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
150
			if (log.isDebugEnabled()) {
151
				log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
152
			}
153

    
154
			return new IndexRecordsInfo()
155
					.setDate(getLastIndexingDate(indexClient))
156
					.setTotal(rsp.getResults().getNumFound());
157
		} catch (Throwable e) {
158
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
159
		}
160
	}
161

    
162
	@Cacheable("index-cache")
163
	public String getLastIndexingDate(final SolrClient indexClient) throws ApiException {
164
		try {
165
			final QueryResponse rsp = indexClient.query(new SolrQuery("oaftype:datasource").setRows(1));
166
			if (rsp.getResults().getNumFound() > 0) {
167
				final SolrDocument doc = rsp.getResults().get(0);
168

    
169
				final Date dsversion = (Date) doc.getFirstValue(DSVERSION);
170
				log.debug("got __dsversion: " + dsversion);
171
				return DateFormatUtils.format(dsversion, DatasourceFunctions.DATE_FORMAT);
172
			} else {
173
				final String defaultDate = getDefaultLastIndexingDate();
174
				log.debug("unable to find documents, defaulting to " + defaultDate);
175
				return defaultDate;
176
			}
177
		} catch (SolrServerException | IOException e) {
178
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile", e);
179
		}
180
	}
181

    
182
	private String getDefaultLastIndexingDate() {
183
		final Calendar cal = Calendar.getInstance();
184
		cal.add(Calendar.MONTH, -1);
185
		return DateFormatUtils.format(cal.getTime(), DatasourceFunctions.DATE_FORMAT);
186
	}
187

    
188
	private synchronized SolrClient getIndexClient(final IndexDsInfo info) {
189
		if (!indexClientMap.containsKey(info.getColl())) {
190

    
191
			final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
192
			client.setDefaultCollection(info.getColl());
193

    
194
			indexClientMap.put(info.getColl(), client);
195
		}
196
		return indexClientMap.get(info.getColl());
197
	}
198

    
199
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
200
		try {
201
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
202
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
203
			}
204
		} catch (final InterruptedException e) {
205
			errors.offer(e);
206
		}
207
	}
208

    
209
}
(2-2/6)