Revision 50143
Added by Claudio Atzori over 6 years ago
DatasourceIndexClient.java | ||
---|---|---|
5 | 5 |
import java.util.List; |
6 | 6 |
import java.util.Map; |
7 | 7 |
import java.util.Queue; |
8 |
import java.util.concurrent.ConcurrentHashMap; |
|
9 |
import java.util.concurrent.CountDownLatch; |
|
10 |
import java.util.concurrent.ScheduledThreadPoolExecutor; |
|
11 |
import java.util.concurrent.TimeUnit; |
|
12 |
import java.util.concurrent.TimeoutException; |
|
13 |
|
|
8 |
import java.util.concurrent.*; |
|
14 | 9 |
import javax.annotation.PostConstruct; |
15 | 10 |
import javax.annotation.PreDestroy; |
16 | 11 |
|
12 |
import com.google.common.collect.Iterables; |
|
13 |
import com.google.common.util.concurrent.*; |
|
14 |
import eu.dnetlib.OpenaireExporterConfig; |
|
15 |
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException; |
|
16 |
import eu.dnetlib.miscutils.functional.hash.Hashing; |
|
17 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions; |
|
18 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo; |
|
19 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo; |
|
17 | 20 |
import org.apache.commons.lang.StringUtils; |
18 | 21 |
import org.apache.commons.lang.time.DateFormatUtils; |
19 |
import org.apache.commons.lang3.exception.ExceptionUtils; |
|
20 | 22 |
import org.apache.commons.logging.Log; |
21 | 23 |
import org.apache.commons.logging.LogFactory; |
24 |
import org.apache.solr.client.solrj.SolrQuery; |
|
25 |
import org.apache.solr.client.solrj.SolrServerException; |
|
26 |
import org.apache.solr.client.solrj.impl.CloudSolrClient; |
|
27 |
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder; |
|
22 | 28 |
import org.apache.solr.client.solrj.response.QueryResponse; |
23 | 29 |
import org.apache.solr.common.SolrDocument; |
24 | 30 |
import org.springframework.beans.factory.annotation.Autowired; |
25 | 31 |
import org.springframework.http.HttpStatus; |
26 | 32 |
import org.springframework.stereotype.Component; |
27 | 33 |
|
28 |
import com.google.common.collect.Iterables; |
|
29 |
import com.google.common.util.concurrent.FutureCallback; |
|
30 |
import com.google.common.util.concurrent.Futures; |
|
31 |
import com.google.common.util.concurrent.ListeningExecutorService; |
|
32 |
import com.google.common.util.concurrent.MoreExecutors; |
|
33 |
import com.google.common.util.concurrent.ThreadFactoryBuilder; |
|
34 |
|
|
35 |
import eu.dnetlib.OpenaireExporterConfig; |
|
36 |
import eu.dnetlib.data.index.CloudIndexClient; |
|
37 |
import eu.dnetlib.data.index.CloudIndexClientException; |
|
38 |
import eu.dnetlib.data.index.CloudIndexClientFactory; |
|
39 |
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException; |
|
40 |
import eu.dnetlib.miscutils.functional.hash.Hashing; |
|
41 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions; |
|
42 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo; |
|
43 |
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo; |
|
44 |
|
|
45 | 34 |
/** |
46 | 35 |
* Created by claudio on 20/10/2016. |
47 | 36 |
*/ |
... | ... | |
58 | 47 |
|
59 | 48 |
private ListeningExecutorService executor; |
60 | 49 |
|
61 |
private static Map<String, CloudIndexClient> indexClientMap = new ConcurrentHashMap<>();
|
|
50 |
private static Map<String, CloudSolrClient> indexClientMap = new ConcurrentHashMap<>();
|
|
62 | 51 |
|
63 | 52 |
@PostConstruct |
64 | 53 |
public void init() { |
... | ... | |
81 | 70 |
public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws DatasourceManagerException { |
82 | 71 |
try { |
83 | 72 |
final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR)); |
84 |
final CloudIndexClient indexClient = getIndexClient(info);
|
|
73 |
final CloudSolrClient indexClient = getIndexClient(info);
|
|
85 | 74 |
final CountDownLatch latch = new CountDownLatch(2); |
86 | 75 |
final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo(); |
87 | 76 |
|
... | ... | |
99 | 88 |
|
100 | 89 |
@Override |
101 | 90 |
public void onFailure(final Throwable e) { |
102 |
log.error(ExceptionUtils.getStackTrace(e)); |
|
103 | 91 |
errors.offer(e); |
104 | 92 |
latch.countDown(); |
105 | 93 |
} |
... | ... | |
117 | 105 |
|
118 | 106 |
@Override |
119 | 107 |
public void onFailure(final Throwable e) { |
120 |
log.error(ExceptionUtils.getStackTrace(e)); |
|
121 | 108 |
errors.offer(e); |
122 | 109 |
latch.countDown(); |
123 | 110 |
} |
... | ... | |
132 | 119 |
|
133 | 120 |
private Long setFunded( |
134 | 121 |
final String collectedFrom, |
135 |
final CloudIndexClient indexClient) throws DatasourceManagerException {
|
|
122 |
final CloudSolrClient indexClient) throws DatasourceManagerException {
|
|
136 | 123 |
final String query = |
137 | 124 |
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom); |
138 | 125 |
log.debug(String.format("query: %s", query)); |
139 | 126 |
try { |
140 |
return indexClient.query(query, 0).getResults().getNumFound();
|
|
127 |
return indexClient.query(new SolrQuery(query).setRows(0)).getResults().getNumFound();
|
|
141 | 128 |
} catch (Throwable e) { |
142 | 129 |
throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e); |
143 | 130 |
} |
... | ... | |
145 | 132 |
|
146 | 133 |
private IndexRecordsInfo setDateAndTotal( |
147 | 134 |
final String collectedFrom, |
148 |
final CloudIndexClient indexClient) throws DatasourceManagerException {
|
|
135 |
final CloudSolrClient indexClient) throws DatasourceManagerException {
|
|
149 | 136 |
try { |
150 | 137 |
final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom); |
151 | 138 |
log.debug(String.format("query: %s", query)); |
152 | 139 |
|
153 |
final QueryResponse rsp = indexClient.query(query, 1);
|
|
140 |
final QueryResponse rsp = indexClient.query(new SolrQuery(query).setRows(1));
|
|
154 | 141 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument()); |
155 | 142 |
if (log.isDebugEnabled()) { |
156 |
log.debug(String.format("got document %s", doc.toString()));
|
|
143 |
log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
|
|
157 | 144 |
} |
158 | 145 |
// if (doc.isEmpty()) { |
159 | 146 |
// throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching |
... | ... | |
178 | 165 |
|
179 | 166 |
public String getLastIndexingDate(final IndexDsInfo info) throws DatasourceManagerException { |
180 | 167 |
try { |
181 |
final QueryResponse rsp = getIndexClient(info).query("oaftype:datasource", 1); |
|
168 |
final SolrQuery query = new SolrQuery("oaftype:datasource").setRows(1); |
|
169 |
final QueryResponse rsp = getIndexClient(info).query(query); |
|
182 | 170 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null); |
183 | 171 |
final String dsversion = doc.get("__dsversion").toString(); |
184 | 172 |
return StringUtils.substringBefore(dsversion, "T"); |
185 |
} catch (CloudIndexClientException e) {
|
|
173 |
} catch (SolrServerException | IOException e) {
|
|
186 | 174 |
throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e); |
187 | 175 |
} |
188 | 176 |
} |
189 | 177 |
|
190 |
private synchronized CloudIndexClient getIndexClient(final IndexDsInfo info) throws CloudIndexClientException {
|
|
178 |
private synchronized CloudSolrClient getIndexClient(final IndexDsInfo info) {
|
|
191 | 179 |
if (!indexClientMap.containsKey(info.getColl())) { |
192 |
indexClientMap.put(info.getColl(), CloudIndexClientFactory.newIndexClient(info.getIndexBaseUrl(), info.getColl(), false)); |
|
180 |
|
|
181 |
final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build(); |
|
182 |
client.setDefaultCollection(info.getColl()); |
|
183 |
|
|
184 |
indexClientMap.put(info.getColl(), client); |
|
193 | 185 |
} |
194 | 186 |
return indexClientMap.get(info.getColl()); |
195 | 187 |
} |
Also available in: Unified diff
huge update: swagger documentation, alignment of methods with the service trunk