Project

General

Profile

« Previous | Next » 

Revision 49817

adopted solrj:7.1.0, implemented new way to fetch the lastIndexingDate

View differences:

DatasourceIndexClient.java
1 1
package eu.dnetlib.openaire.exporter.datasource.clients;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Date;
5
import java.util.List;
4
import java.util.Calendar;
6 5
import java.util.Map;
7 6
import java.util.Queue;
8 7
import java.util.concurrent.*;
......
12 11
import com.google.common.collect.Iterables;
13 12
import com.google.common.util.concurrent.*;
14 13
import eu.dnetlib.OpenaireExporterConfig;
15
import eu.dnetlib.data.index.CloudIndexClient;
16
import eu.dnetlib.data.index.CloudIndexClientException;
17
import eu.dnetlib.data.index.CloudIndexClientFactory;
18 14
import eu.dnetlib.miscutils.functional.hash.Hashing;
19 15
import eu.dnetlib.openaire.exporter.datasource.ApiException;
20 16
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
......
25 21
import org.apache.commons.lang3.exception.ExceptionUtils;
26 22
import org.apache.commons.logging.Log;
27 23
import org.apache.commons.logging.LogFactory;
24
import org.apache.solr.client.solrj.SolrClient;
25
import org.apache.solr.client.solrj.SolrQuery;
26
import org.apache.solr.client.solrj.SolrServerException;
27
import org.apache.solr.client.solrj.impl.CloudSolrClient;
28
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
28 29
import org.apache.solr.client.solrj.response.QueryResponse;
29 30
import org.apache.solr.common.SolrDocument;
30 31
import org.springframework.beans.factory.annotation.Autowired;
32
import org.springframework.cache.annotation.Cacheable;
31 33
import org.springframework.http.HttpStatus;
32 34
import org.springframework.stereotype.Component;
33 35

  
......
47 49

  
48 50
	private ListeningExecutorService executor;
49 51

  
50
	private static Map<String, CloudIndexClient> indexClientMap = new ConcurrentHashMap<>();
52
	private static Map<String, SolrClient> indexClientMap = new ConcurrentHashMap<>();
51 53

  
52 54
	@PostConstruct
53 55
	public void init() {
......
70 72
	public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws ApiException {
71 73
		try {
72 74
			final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
73
			final CloudIndexClient indexClient = getIndexClient(info);
75
			final SolrClient indexClient = getIndexClient(info);
74 76
			final CountDownLatch latch = new CountDownLatch(2);
75 77
			final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
76 78

  
77 79
			Futures.addCallback(
78
				executor.submit(() -> setDateAndTotal(collectedFrom, indexClient)),
80
				executor.submit(() -> setTotal(collectedFrom, indexClient)),
79 81
						new FutureCallback<IndexRecordsInfo>() {
80 82
							@Override
81 83
							public void onSuccess(final IndexRecordsInfo info) {
......
119 121

  
120 122
	private Long setFunded(
121 123
			final String collectedFrom,
122
			final CloudIndexClient indexClient) throws ApiException {
123
		final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom);
124
			final SolrClient indexClient) throws ApiException {
125

  
126
		final SolrQuery query = new SolrQuery(
127
				String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom))
128
				.setRows(0);
129

  
124 130
		log.debug(String.format("query: %s", query));
125 131
		try {
126
			return indexClient.query(query, 0).getResults().getNumFound();
132
			return indexClient.query(query).getResults().getNumFound();
127 133
		} catch (Throwable e) {
128 134
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
129 135
		}
130 136
	}
131 137

  
132
	private IndexRecordsInfo setDateAndTotal(
138
	private IndexRecordsInfo setTotal(
133 139
			final String collectedFrom,
134
			final CloudIndexClient indexClient) throws ApiException {
140
			final SolrClient indexClient) throws ApiException {
135 141
		try {
136
			final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom);
142
			final SolrQuery query = new SolrQuery(
143
					String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom))
144
					.setRows(1);
137 145
			log.debug(String.format("query: %s", query));
138 146

  
139
			final QueryResponse rsp = indexClient.query(query, 1);
147
			final QueryResponse rsp = indexClient.query(query);
140 148
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
141 149
			if (log.isDebugEnabled()) {
142 150
				log.debug(String.format("got document %s", doc.toString()));
143 151
			}
144
			//			if (doc.isEmpty()) {
145
			//				throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching query: %s", queryTotal));
146
			//			}
152

  
147 153
			return new IndexRecordsInfo()
148
					.setDate(getDate(doc))
154
					.setDate(getLastIndexingDate(indexClient))
149 155
					.setTotal(rsp.getResults().getNumFound());
150 156
		} catch (Throwable e) {
151 157
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
152 158
		}
153 159
	}
154 160

  
155
	private String getDate(final SolrDocument doc) throws ApiException {
156
		final List<Date> dsversion = (List<Date>) doc.get(DSVERSION);
157
		if (dsversion == null || dsversion.isEmpty()) {
158
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find %s in matched solr document", DSVERSION));
161
	@Cacheable("index-cache")
162
	public String getLastIndexingDate(final SolrClient indexClient) throws ApiException {
163
		try {
164
			final QueryResponse rsp = indexClient.query(new SolrQuery("oaftype:datasource").setRows(1));
165
			if (rsp.getResults().getNumFound() > 0) {
166
				final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
167

  
168
				final String dsversion = doc.get(DSVERSION).toString();
169
				return StringUtils.substringBefore(dsversion, "T");
170
			} else {
171
				final String defaultDate = getDefaultLastIndexingDate();
172
				log.debug("unable to find documents, defaulting to " + defaultDate);
173
				return defaultDate;
174
			}
175
		} catch (SolrServerException | IOException e) {
176
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile", e);
159 177
		}
160
		final Date date = Iterables.getLast(dsversion);
161

  
162
		return DateFormatUtils.format(date, DatasourceFunctions.DATE_FORMAT);
163 178
	}
164 179

  
165
	public String getLastIndexingDate(final IndexDsInfo info) throws ApiException {
166
		try {
167
			final QueryResponse rsp = getIndexClient(info).query("oaftype:datasource", 1);
168
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
169
			final String dsversion = doc.get("__dsversion").toString();
170
			return StringUtils.substringBefore(dsversion, "T");
171
		} catch (CloudIndexClientException e) {
172
			throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e);
173
		}
180
	private String getDefaultLastIndexingDate() {
181
		final Calendar cal = Calendar.getInstance();
182
		cal.add(Calendar.MONTH, -1);
183
		return DateFormatUtils.format(cal.getTime(), DatasourceFunctions.DATE_FORMAT);
174 184
	}
175 185

  
176
	private synchronized CloudIndexClient getIndexClient(final IndexDsInfo info) throws CloudIndexClientException {
186
	private synchronized SolrClient getIndexClient(final IndexDsInfo info) {
177 187
		if (!indexClientMap.containsKey(info.getColl())) {
178
			indexClientMap.put(info.getColl(), CloudIndexClientFactory.newIndexClient(info.getIndexBaseUrl(), info.getColl(), false));
188

  
189
			final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
190
			client.setDefaultCollection(info.getColl());
191

  
192
			indexClientMap.put(info.getColl(), client);
179 193
		}
180 194
		return indexClientMap.get(info.getColl());
181 195
	}

Also available in: Unified diff