Revision 49817
Added by Claudio Atzori over 6 years ago
DatasourceIndexClient.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.openaire.exporter.datasource.clients; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.Date; |
|
5 |
import java.util.List; |
|
4 |
import java.util.Calendar; |
|
6 | 5 |
import java.util.Map; |
7 | 6 |
import java.util.Queue; |
8 | 7 |
import java.util.concurrent.*; |
... | ... | |
12 | 11 |
import com.google.common.collect.Iterables; |
13 | 12 |
import com.google.common.util.concurrent.*; |
14 | 13 |
import eu.dnetlib.OpenaireExporterConfig; |
15 |
import eu.dnetlib.data.index.CloudIndexClient; |
|
16 |
import eu.dnetlib.data.index.CloudIndexClientException; |
|
17 |
import eu.dnetlib.data.index.CloudIndexClientFactory; |
|
18 | 14 |
import eu.dnetlib.miscutils.functional.hash.Hashing; |
19 | 15 |
import eu.dnetlib.openaire.exporter.datasource.ApiException; |
20 | 16 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions; |
... | ... | |
25 | 21 |
import org.apache.commons.lang3.exception.ExceptionUtils; |
26 | 22 |
import org.apache.commons.logging.Log; |
27 | 23 |
import org.apache.commons.logging.LogFactory; |
24 |
import org.apache.solr.client.solrj.SolrClient; |
|
25 |
import org.apache.solr.client.solrj.SolrQuery; |
|
26 |
import org.apache.solr.client.solrj.SolrServerException; |
|
27 |
import org.apache.solr.client.solrj.impl.CloudSolrClient; |
|
28 |
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; |
|
28 | 29 |
import org.apache.solr.client.solrj.response.QueryResponse; |
29 | 30 |
import org.apache.solr.common.SolrDocument; |
30 | 31 |
import org.springframework.beans.factory.annotation.Autowired; |
32 |
import org.springframework.cache.annotation.Cacheable; |
|
31 | 33 |
import org.springframework.http.HttpStatus; |
32 | 34 |
import org.springframework.stereotype.Component; |
33 | 35 |
|
... | ... | |
47 | 49 |
|
48 | 50 |
private ListeningExecutorService executor; |
49 | 51 |
|
50 |
private static Map<String, CloudIndexClient> indexClientMap = new ConcurrentHashMap<>();
|
|
52 |
private static Map<String, SolrClient> indexClientMap = new ConcurrentHashMap<>();
|
|
51 | 53 |
|
52 | 54 |
@PostConstruct |
53 | 55 |
public void init() { |
... | ... | |
70 | 72 |
public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws ApiException { |
71 | 73 |
try { |
72 | 74 |
final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR)); |
73 |
final CloudIndexClient indexClient = getIndexClient(info);
|
|
75 |
final SolrClient indexClient = getIndexClient(info);
|
|
74 | 76 |
final CountDownLatch latch = new CountDownLatch(2); |
75 | 77 |
final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo(); |
76 | 78 |
|
77 | 79 |
Futures.addCallback( |
78 |
executor.submit(() -> setDateAndTotal(collectedFrom, indexClient)),
|
|
80 |
executor.submit(() -> setTotal(collectedFrom, indexClient)), |
|
79 | 81 |
new FutureCallback<IndexRecordsInfo>() { |
80 | 82 |
@Override |
81 | 83 |
public void onSuccess(final IndexRecordsInfo info) { |
... | ... | |
119 | 121 |
|
120 | 122 |
private Long setFunded( |
121 | 123 |
final String collectedFrom, |
122 |
final CloudIndexClient indexClient) throws ApiException { |
|
123 |
final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom); |
|
124 |
final SolrClient indexClient) throws ApiException { |
|
125 |
|
|
126 |
final SolrQuery query = new SolrQuery( |
|
127 |
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom)) |
|
128 |
.setRows(0); |
|
129 |
|
|
124 | 130 |
log.debug(String.format("query: %s", query)); |
125 | 131 |
try { |
126 |
return indexClient.query(query, 0).getResults().getNumFound();
|
|
132 |
return indexClient.query(query).getResults().getNumFound(); |
|
127 | 133 |
} catch (Throwable e) { |
128 | 134 |
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e); |
129 | 135 |
} |
130 | 136 |
} |
131 | 137 |
|
132 |
private IndexRecordsInfo setDateAndTotal(
|
|
138 |
private IndexRecordsInfo setTotal( |
|
133 | 139 |
final String collectedFrom, |
134 |
final CloudIndexClient indexClient) throws ApiException {
|
|
140 |
final SolrClient indexClient) throws ApiException {
|
|
135 | 141 |
try { |
136 |
final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom); |
|
142 |
final SolrQuery query = new SolrQuery( |
|
143 |
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom)) |
|
144 |
.setRows(1); |
|
137 | 145 |
log.debug(String.format("query: %s", query)); |
138 | 146 |
|
139 |
final QueryResponse rsp = indexClient.query(query, 1);
|
|
147 |
final QueryResponse rsp = indexClient.query(query); |
|
140 | 148 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument()); |
141 | 149 |
if (log.isDebugEnabled()) { |
142 | 150 |
log.debug(String.format("got document %s", doc.toString())); |
143 | 151 |
} |
144 |
// if (doc.isEmpty()) { |
|
145 |
// throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching query: %s", queryTotal)); |
|
146 |
// } |
|
152 |
|
|
147 | 153 |
return new IndexRecordsInfo() |
148 |
.setDate(getDate(doc))
|
|
154 |
.setDate(getLastIndexingDate(indexClient))
|
|
149 | 155 |
.setTotal(rsp.getResults().getNumFound()); |
150 | 156 |
} catch (Throwable e) { |
151 | 157 |
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e); |
152 | 158 |
} |
153 | 159 |
} |
154 | 160 |
|
155 |
private String getDate(final SolrDocument doc) throws ApiException { |
|
156 |
final List<Date> dsversion = (List<Date>) doc.get(DSVERSION); |
|
157 |
if (dsversion == null || dsversion.isEmpty()) { |
|
158 |
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find %s in matched solr document", DSVERSION)); |
|
161 |
@Cacheable("index-cache") |
|
162 |
public String getLastIndexingDate(final SolrClient indexClient) throws ApiException { |
|
163 |
try { |
|
164 |
final QueryResponse rsp = indexClient.query(new SolrQuery("oaftype:datasource").setRows(1)); |
|
165 |
if (rsp.getResults().getNumFound() > 0) { |
|
166 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null); |
|
167 |
|
|
168 |
final String dsversion = doc.get(DSVERSION).toString(); |
|
169 |
return StringUtils.substringBefore(dsversion, "T"); |
|
170 |
} else { |
|
171 |
final String defaultDate = getDefaultLastIndexingDate(); |
|
172 |
log.debug("unable to find documents, defaulting to " + defaultDate); |
|
173 |
return defaultDate; |
|
174 |
} |
|
175 |
} catch (SolrServerException | IOException e) { |
|
176 |
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile", e); |
|
159 | 177 |
} |
160 |
final Date date = Iterables.getLast(dsversion); |
|
161 |
|
|
162 |
return DateFormatUtils.format(date, DatasourceFunctions.DATE_FORMAT); |
|
163 | 178 |
} |
164 | 179 |
|
165 |
public String getLastIndexingDate(final IndexDsInfo info) throws ApiException { |
|
166 |
try { |
|
167 |
final QueryResponse rsp = getIndexClient(info).query("oaftype:datasource", 1); |
|
168 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null); |
|
169 |
final String dsversion = doc.get("__dsversion").toString(); |
|
170 |
return StringUtils.substringBefore(dsversion, "T"); |
|
171 |
} catch (CloudIndexClientException e) { |
|
172 |
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e); |
|
173 |
} |
|
180 |
private String getDefaultLastIndexingDate() { |
|
181 |
final Calendar cal = Calendar.getInstance(); |
|
182 |
cal.add(Calendar.MONTH, -1); |
|
183 |
return DateFormatUtils.format(cal.getTime(), DatasourceFunctions.DATE_FORMAT); |
|
174 | 184 |
} |
175 | 185 |
|
176 |
private synchronized CloudIndexClient getIndexClient(final IndexDsInfo info) throws CloudIndexClientException {
|
|
186 |
private synchronized SolrClient getIndexClient(final IndexDsInfo info) {
|
|
177 | 187 |
if (!indexClientMap.containsKey(info.getColl())) { |
178 |
indexClientMap.put(info.getColl(), CloudIndexClientFactory.newIndexClient(info.getIndexBaseUrl(), info.getColl(), false)); |
|
188 |
|
|
189 |
final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build(); |
|
190 |
client.setDefaultCollection(info.getColl()); |
|
191 |
|
|
192 |
indexClientMap.put(info.getColl(), client); |
|
179 | 193 |
} |
180 | 194 |
return indexClientMap.get(info.getColl()); |
181 | 195 |
} |
Also available in: Unified diff
adopted solrj:7.1.0, implemented new way to fetch the lastIndexingDate