Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.io.IOException;
4
import java.util.Date;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Queue;
8
import java.util.concurrent.*;
9
import javax.annotation.PostConstruct;
10
import javax.annotation.PreDestroy;
11

    
12
import com.google.common.collect.Iterables;
13
import com.google.common.util.concurrent.*;
14
import eu.dnetlib.OpenaireExporterConfig;
15
import eu.dnetlib.enabling.datasources.common.DsmException;
16
import eu.dnetlib.miscutils.functional.hash.Hashing;
17
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
18
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
19
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
20
import org.apache.commons.lang.StringUtils;
21
import org.apache.commons.lang.time.DateFormatUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.apache.solr.client.solrj.SolrQuery;
25
import org.apache.solr.client.solrj.SolrServerException;
26
import org.apache.solr.client.solrj.impl.CloudSolrClient;
27
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
28
import org.apache.solr.client.solrj.response.QueryResponse;
29
import org.apache.solr.common.SolrDocument;
30
import org.springframework.beans.factory.annotation.Autowired;
31
import org.springframework.http.HttpStatus;
32
import org.springframework.stereotype.Component;
33

    
34
/**
35
 * Created by claudio on 20/10/2016.
36
 */
37
@Component
38
public class DatasourceIndexClientImpl implements DatasourceIndexClient {
39

    
40
	private static final Log log = LogFactory.getLog(DatasourceIndexClientImpl.class);
41

    
42
	public static final String SEPARATOR = "::";
43
	public static final String DSVERSION = "__dsversion";
44

    
45
	@Autowired
46
	private OpenaireExporterConfig config;
47

    
48
	private ListeningExecutorService executor;
49

    
50
	private static Map<String, CloudSolrClient> indexClientMap = new ConcurrentHashMap<>();
51

    
52
	@PostConstruct
53
	public void init() {
54
		executor = MoreExecutors.listeningDecorator(
55
				new ScheduledThreadPoolExecutor(5,
56
						new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build()));
57
	}
58

    
59
	@PreDestroy
60
	public void tearDown() {
61
		indexClientMap.forEach((name, client) -> {
62
			try {
63
				client.close();
64
			} catch (IOException e) {
65
				log.warn(String.format("unable to gracefully shutdown client for index %s", name));
66
			}
67
		});
68
	}
69

    
70
	@Override
71
	public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws DsmException {
72
		try {
73
			final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
74
			final CloudSolrClient indexClient = getIndexClient(info);
75
			final CountDownLatch latch = new CountDownLatch(2);
76
			final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
77

    
78
			Futures.addCallback(
79
					executor.submit(() -> setDateAndTotal(collectedFrom, indexClient)),
80
					new FutureCallback<IndexRecordsInfo>() {
81

    
82
						@Override
83
						public void onSuccess(final IndexRecordsInfo info) {
84
							indexRecordInfo
85
									.setTotal(info.getTotal())
86
									.setDate(info.getDate());
87
							latch.countDown();
88
						}
89

    
90
						@Override
91
						public void onFailure(final Throwable e) {
92
							errors.offer(e);
93
							latch.countDown();
94
						}
95
					}, executor);
96

    
97
			Futures.addCallback(
98
					executor.submit(() -> setFunded(collectedFrom, indexClient)),
99
					new FutureCallback<Long>() {
100

    
101
						@Override
102
						public void onSuccess(final Long numFound) {
103
							indexRecordInfo.setFunded(numFound);
104
							latch.countDown();
105
						}
106

    
107
						@Override
108
						public void onFailure(final Throwable e) {
109
							errors.offer(e);
110
							latch.countDown();
111
						}
112
					}, executor);
113

    
114
			waitLatch(latch, errors, config.getRequestTimeout());
115
			return indexRecordInfo;
116
		} catch (final Throwable e) {
117
			throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), e.getMessage(), e);
118
		}
119
	}
120

    
121
	@Override
122
	public String getLastIndexingDate(final IndexDsInfo info) throws DsmException {
123
		try {
124
			final SolrQuery query = new SolrQuery("oaftype:datasource").setRows(1);
125
			final QueryResponse rsp = getIndexClient(info).query(query);
126
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
127
			final String dsversion = doc.get("__dsversion").toString();
128
			return StringUtils.substringBefore(dsversion, "T");
129
		} catch (SolrServerException | IOException e) {
130
			throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e);
131
		}
132
	}
133

    
134
	private Long setFunded(
135
			final String collectedFrom,
136
			final CloudSolrClient indexClient) throws DsmException {
137
		final String query =
138
				String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom);
139
		log.debug(String.format("query: %s", query));
140
		try {
141
			return indexClient.query(new SolrQuery(query).setRows(0)).getResults().getNumFound();
142
		} catch (Throwable e) {
143
			throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
144
		}
145
	}
146

    
147
	private IndexRecordsInfo setDateAndTotal(
148
			final String collectedFrom,
149
			final CloudSolrClient indexClient) throws DsmException {
150
		try {
151
			final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom);
152
			log.debug(String.format("query: %s", query));
153

    
154
			final QueryResponse rsp = indexClient.query(new SolrQuery(query).setRows(1));
155
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
156
			if (log.isDebugEnabled()) {
157
				log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
158
			}
159
			// if (doc.isEmpty()) {
160
			// throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching
161
			// query: %s", queryTotal));
162
			// }
163
			return new IndexRecordsInfo()
164
					.setDate(getDate(doc))
165
					.setTotal(rsp.getResults().getNumFound());
166
		} catch (Throwable e) {
167
			throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
168
		}
169
	}
170

    
171
	@SuppressWarnings("unchecked")
172
	private String getDate(final SolrDocument doc) throws DsmException {
173
		final List<Date> dsversion = (List<Date>) doc.get(DSVERSION);
174
		if (dsversion == null || dsversion.isEmpty()) { throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(),
175
				String.format("cannot find %s in matched solr document", DSVERSION)); }
176
		final Date date = Iterables.getLast(dsversion);
177

    
178
		return DateFormatUtils.format(date, DatasourceFunctions.DATE_FORMAT);
179
	}
180

    
181
	private synchronized CloudSolrClient getIndexClient(final IndexDsInfo info) {
182
		if (!indexClientMap.containsKey(info.getColl())) {
183

    
184
			final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
185
			client.setDefaultCollection(info.getColl());
186

    
187
			indexClientMap.put(info.getColl(), client);
188
		}
189
		return indexClientMap.get(info.getColl());
190
	}
191

    
192
	private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
193
		try {
194
			if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
195
				errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
196
			}
197
		} catch (final InterruptedException e) {
198
			errors.offer(e);
199
		}
200
	}
201

    
202
}
(4-4/13)