Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.io.IOException;
4
import java.util.Calendar;
5
import java.util.Map;
6
import java.util.Queue;
7
import java.util.concurrent.*;
8
import javax.annotation.PostConstruct;
9
import javax.annotation.PreDestroy;
10

    
11
import com.google.common.collect.Iterables;
12
import com.google.common.util.concurrent.*;
13
import eu.dnetlib.OpenaireExporterConfig;
14
import eu.dnetlib.data.index.CloudIndexClient;
15
import eu.dnetlib.data.index.CloudIndexClientException;
16
import eu.dnetlib.data.index.CloudIndexClientFactory;
17
import eu.dnetlib.miscutils.functional.hash.Hashing;
18
import eu.dnetlib.openaire.exporter.datasource.ApiException;
19
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
20
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
21
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
22
import org.apache.commons.lang.StringUtils;
23
import org.apache.commons.lang.time.DateFormatUtils;
24
import org.apache.commons.lang3.exception.ExceptionUtils;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.apache.solr.client.solrj.response.QueryResponse;
28
import org.apache.solr.common.SolrDocument;
29
import org.springframework.beans.factory.annotation.Autowired;
30
import org.springframework.cache.annotation.Cacheable;
31
import org.springframework.http.HttpStatus;
32
import org.springframework.stereotype.Component;
33

    
34
/**
35
 * Created by claudio on 20/10/2016.
36
 */
37
@Component
38
public class DatasourceIndexClient {
39

    
40
	private static final Log log = LogFactory.getLog(DatasourceIndexClient.class);
41

    
42
	public static final String SEPARATOR = "::";
43
	public static final String DSVERSION = "__dsversion";
44

    
45
	@Autowired
46
	private OpenaireExporterConfig config;
47

    
48
	private ListeningExecutorService executor;
49

    
50
	private static Map<String, CloudIndexClient> indexClientMap = new ConcurrentHashMap<>();
51

    
52
	@PostConstruct
53
	public void init() {
54
		executor = MoreExecutors.listeningDecorator(
55
				new ScheduledThreadPoolExecutor(2,
56
						new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build()));
57
	}
58

    
59
	@PreDestroy
60
	public void tearDown() {
61
		indexClientMap.forEach((name, client) -> {
62
			try {
63
				client.close();
64
			} catch (IOException e) {
65
				log.warn(String.format("unable to gracefully shutdown client for index %s", name));
66
			}
67
		});
68
	}
69

    
70
	public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws ApiException {
71
		try {
72
			final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
73
			final CloudIndexClient indexClient = getIndexClient(info);
74
			final CountDownLatch latch = new CountDownLatch(2);
75
			final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
76

    
77
			Futures.addCallback(
78
				executor.submit(() -> setDateAndTotal(collectedFrom, indexClient)),
79
						new FutureCallback<IndexRecordsInfo>() {
80
							@Override
81
							public void onSuccess(final IndexRecordsInfo info) {
82
								indexRecordInfo
83
										.setTotal(info.getTotal())
84
										.setDate(info.getDate());
85
								latch.countDown();
86
							}
87

    
88
							@Override
89
							public void onFailure(final Throwable e) {
90
								log.error(ExceptionUtils.getStackTrace(e));
91
								errors.offer(e);
92
								latch.countDown();
93
							}
94
						});
95

    
96
			Futures.addCallback(
97
					executor.submit(() -> setFunded(collectedFrom, indexClient)),
98
					new FutureCallback<Long>() {
99
						@Override
100
						public void onSuccess(final Long numFound) {
101
							indexRecordInfo.setFunded(numFound);
102
							latch.countDown();
103
						}
104

    
105
						@Override
106
						public void onFailure(final Throwable e) {
107
							log.error(ExceptionUtils.getStackTrace(e));
108
							errors.offer(e);
109
							latch.countDown();
110
						}
111
					});
112

    
113
			waitLatch(latch, errors, config.getRequestTimeout());
114
			return indexRecordInfo;
115
		} catch (final Throwable e) {
116
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying publications from: " + dsId, e);
117
		}
118
	}
119

    
120
	private Long setFunded(
121
			final String collectedFrom,
122
			final CloudIndexClient indexClient) throws ApiException {
123

    
124
		//TODO consider using relfunderid:* to identify the funded content
125
		final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom);
126
		log.debug(String.format("query: %s", query));
127
		try {
128
			return indexClient.query(query, 0).getResults().getNumFound();
129
		} catch (Throwable e) {
130
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
131
		}
132
	}
133

    
134
	private IndexRecordsInfo setDateAndTotal(
135
			final String collectedFrom,
136
			final CloudIndexClient indexClient) throws ApiException {
137
		try {
138
			final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom);
139
			log.debug(String.format("query: %s", query));
140

    
141
			final QueryResponse rsp = indexClient.query(query, 1);
142
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
143
			if (log.isDebugEnabled()) {
144
				log.debug(String.format("got document %s", doc.toString()));
145
			}
146
			//			if (doc.isEmpty()) {
147
			//				throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching query: %s", queryTotal));
148
			//			}
149
			return new IndexRecordsInfo()
150
					.setDate(getLastIndexingDate(indexClient))
151
					.setTotal(rsp.getResults().getNumFound());
152
		} catch (Throwable e) {
153
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
154
		}
155
	}
156

    
157
	@Cacheable("index-cache")
158
	public String getLastIndexingDate(final CloudIndexClient indexClient) throws ApiException {
159
		try {
160
			final QueryResponse rsp = indexClient.query("oaftype:datasource", 1);
161
			if (rsp.getResults().getNumFound() > 0) {
162
				final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
163

    
164
				final String dsversion = doc.get(DSVERSION).toString();
165
				return StringUtils.substringBefore(dsversion, "T");
166
			} else {
167
				final String defaultDate = getDefaultLastIndexingDate();
168
				log.debug("unable to find documents, defaulting to " + defaultDate);
169
				return defaultDate;
170
			}
171
		} catch (CloudIndexClientException e) {
172
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile", e);
173
		}
174
	}
175

    
176
	private String getDefaultLastIndexingDate() {
177
		final Calendar cal = Calendar.getInstance();
178
		cal.add(Calendar.MONTH, -1);
179
		return DateFormatUtils.format(cal.getTime(), DatasourceFunctions.DATE_FORMAT);
180
	}
181

    
182
	private synchronized CloudIndexClient getIndexClient(final IndexDsInfo info) throws CloudIndexClientException {
183
		if (!indexClientMap.containsKey(info.getColl())) {
184
			indexClientMap.put(info.getColl(), CloudIndexClientFactory.newIndexClient(info.getIndexBaseUrl(), info.getColl(), false));
185
		}
186
		return indexClientMap.get(info.getColl());
187
	}
188

    
189
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
190
		try {
191
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
192
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
193
			}
194
		} catch (final InterruptedException e) {
195
			errors.offer(e);
196
		}
197
	}
198

    
199
}
(3-3/6)