1 |
1 |
package eu.dnetlib.openaire.dsm.dao;
|
2 |
2 |
|
3 |
|
import java.io.IOException;
|
4 |
|
import java.util.Date;
|
5 |
|
import java.util.List;
|
6 |
|
import java.util.Map;
|
7 |
|
import java.util.Queue;
|
8 |
|
import java.util.concurrent.*;
|
9 |
|
import javax.annotation.PostConstruct;
|
10 |
|
import javax.annotation.PreDestroy;
|
11 |
|
|
12 |
3 |
import com.google.common.collect.Iterables;
|
13 |
4 |
import com.google.common.util.concurrent.*;
|
14 |
5 |
import eu.dnetlib.OpenaireExporterConfig;
|
... | ... | |
23 |
14 |
import org.apache.commons.logging.LogFactory;
|
24 |
15 |
import org.apache.solr.client.solrj.SolrQuery;
|
25 |
16 |
import org.apache.solr.client.solrj.SolrServerException;
|
26 |
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
27 |
|
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
|
17 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
28 |
18 |
import org.apache.solr.client.solrj.response.QueryResponse;
|
29 |
19 |
import org.apache.solr.common.SolrDocument;
|
30 |
20 |
import org.springframework.beans.factory.annotation.Autowired;
|
... | ... | |
32 |
22 |
import org.springframework.http.HttpStatus;
|
33 |
23 |
import org.springframework.stereotype.Component;
|
34 |
24 |
|
|
25 |
import javax.annotation.PostConstruct;
|
|
26 |
import javax.annotation.PreDestroy;
|
|
27 |
import java.util.Date;
|
|
28 |
import java.util.List;
|
|
29 |
import java.util.Map;
|
|
30 |
import java.util.Queue;
|
|
31 |
import java.util.concurrent.*;
|
|
32 |
|
35 |
33 |
/**
|
36 |
34 |
* Created by claudio on 20/10/2016.
|
37 |
35 |
*/
|
... | ... | |
49 |
47 |
|
50 |
48 |
private ListeningExecutorService executor;
|
51 |
49 |
|
52 |
|
private static Map<String, CloudSolrClient> indexClientMap = new ConcurrentHashMap<>();
|
|
50 |
private static Map<String, CloudSolrServer> indexClientMap = new ConcurrentHashMap<>();
|
53 |
51 |
|
54 |
52 |
@PostConstruct
|
55 |
53 |
public void init() {
|
... | ... | |
61 |
59 |
@PreDestroy
|
62 |
60 |
public void tearDown() {
|
63 |
61 |
indexClientMap.forEach((name, client) -> {
|
64 |
|
try {
|
65 |
|
client.close();
|
66 |
|
} catch (IOException e) {
|
67 |
|
log.warn(String.format("unable to gracefully shutdown client for index %s", name));
|
68 |
|
}
|
|
62 |
client.shutdown();
|
69 |
63 |
});
|
70 |
64 |
}
|
71 |
65 |
|
... | ... | |
73 |
67 |
public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws DsmException {
|
74 |
68 |
try {
|
75 |
69 |
final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
|
76 |
|
final CloudSolrClient indexClient = getIndexClient(info);
|
|
70 |
final CloudSolrServer indexClient = getIndexClient(info);
|
77 |
71 |
final CountDownLatch latch = new CountDownLatch(2);
|
78 |
72 |
final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
|
79 |
73 |
|
... | ... | |
128 |
122 |
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
|
129 |
123 |
final String dsversion = doc.get("__dsversion").toString();
|
130 |
124 |
return StringUtils.substringBefore(dsversion, "T");
|
131 |
|
} catch (SolrServerException | IOException e) {
|
|
125 |
} catch (SolrServerException e) {
|
132 |
126 |
throw new DsmException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e);
|
133 |
127 |
}
|
134 |
128 |
}
|
... | ... | |
136 |
130 |
private Long setFunded(
|
137 |
131 |
final String dsId,
|
138 |
132 |
final String collectedFrom,
|
139 |
|
final CloudSolrClient indexClient) throws DsmException {
|
|
133 |
final CloudSolrServer indexClient) throws DsmException {
|
140 |
134 |
final String query =
|
141 |
135 |
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom);
|
142 |
136 |
log.debug(String.format("query: %s", query));
|
... | ... | |
150 |
144 |
private IndexRecordsInfo setDateAndTotal(
|
151 |
145 |
final String dsId,
|
152 |
146 |
final String collectedFrom,
|
153 |
|
final CloudSolrClient indexClient) throws DsmException {
|
|
147 |
final CloudSolrServer indexClient) throws DsmException {
|
154 |
148 |
try {
|
155 |
149 |
final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom);
|
156 |
150 |
log.debug(String.format("query: %s", query));
|
... | ... | |
182 |
176 |
return DateFormatUtils.format(date, DsmMappingUtils.DATE_FORMAT);
|
183 |
177 |
}
|
184 |
178 |
|
185 |
|
private synchronized CloudSolrClient getIndexClient(final IndexDsInfo info) {
|
|
179 |
private synchronized CloudSolrServer getIndexClient(final IndexDsInfo info) {
|
186 |
180 |
if (!indexClientMap.containsKey(info.getColl())) {
|
187 |
181 |
|
188 |
|
final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
|
|
182 |
final CloudSolrServer client = new CloudSolrServer(info.getIndexBaseUrl());
|
189 |
183 |
client.setDefaultCollection(info.getColl());
|
190 |
184 |
|
191 |
185 |
indexClientMap.put(info.getColl(), client);
|
branch for production, still using solr 4.9.x