1
|
package eu.dnetlib.openaire.exporter.datasource.clients;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Calendar;
|
5
|
import java.util.Map;
|
6
|
import java.util.Queue;
|
7
|
import java.util.concurrent.*;
|
8
|
import javax.annotation.PostConstruct;
|
9
|
import javax.annotation.PreDestroy;
|
10
|
|
11
|
import com.google.common.collect.Iterables;
|
12
|
import com.google.common.util.concurrent.*;
|
13
|
import eu.dnetlib.OpenaireExporterConfig;
|
14
|
import eu.dnetlib.miscutils.functional.hash.Hashing;
|
15
|
import eu.dnetlib.openaire.exporter.datasource.ApiException;
|
16
|
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
|
17
|
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
|
18
|
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
|
19
|
import org.apache.commons.lang.StringUtils;
|
20
|
import org.apache.commons.lang.time.DateFormatUtils;
|
21
|
import org.apache.commons.lang3.exception.ExceptionUtils;
|
22
|
import org.apache.commons.logging.Log;
|
23
|
import org.apache.commons.logging.LogFactory;
|
24
|
import org.apache.solr.client.solrj.SolrClient;
|
25
|
import org.apache.solr.client.solrj.SolrQuery;
|
26
|
import org.apache.solr.client.solrj.SolrServerException;
|
27
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
28
|
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
29
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
30
|
import org.apache.solr.common.SolrDocument;
|
31
|
import org.springframework.beans.factory.annotation.Autowired;
|
32
|
import org.springframework.cache.annotation.Cacheable;
|
33
|
import org.springframework.http.HttpStatus;
|
34
|
import org.springframework.stereotype.Component;
|
35
|
|
36
|
/**
|
37
|
* Created by claudio on 20/10/2016.
|
38
|
*/
|
39
|
@Component
|
40
|
public class DatasourceIndexClient {
|
41
|
|
42
|
private static final Log log = LogFactory.getLog(DatasourceIndexClient.class);
|
43
|
|
44
|
public static final String SEPARATOR = "::";
|
45
|
public static final String DSVERSION = "__dsversion";
|
46
|
|
47
|
@Autowired
|
48
|
private OpenaireExporterConfig config;
|
49
|
|
50
|
private ListeningExecutorService executor;
|
51
|
|
52
|
private static Map<String, SolrClient> indexClientMap = new ConcurrentHashMap<>();
|
53
|
|
54
|
@PostConstruct
|
55
|
public void init() {
|
56
|
executor = MoreExecutors.listeningDecorator(
|
57
|
new ScheduledThreadPoolExecutor(2,
|
58
|
new ThreadFactoryBuilder().setNameFormat("datasource-index-client-%d").build()));
|
59
|
}
|
60
|
|
61
|
@PreDestroy
|
62
|
public void tearDown() {
|
63
|
indexClientMap.forEach((name, client) -> {
|
64
|
try {
|
65
|
client.close();
|
66
|
} catch (IOException e) {
|
67
|
log.warn(String.format("unable to gracefully shutdown client for index %s", name));
|
68
|
}
|
69
|
});
|
70
|
}
|
71
|
|
72
|
public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws ApiException {
|
73
|
try {
|
74
|
final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
|
75
|
final SolrClient indexClient = getIndexClient(info);
|
76
|
final CountDownLatch latch = new CountDownLatch(2);
|
77
|
final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
|
78
|
|
79
|
Futures.addCallback(
|
80
|
executor.submit(() -> setTotal(collectedFrom, indexClient)),
|
81
|
new FutureCallback<IndexRecordsInfo>() {
|
82
|
@Override
|
83
|
public void onSuccess(final IndexRecordsInfo info) {
|
84
|
indexRecordInfo
|
85
|
.setTotal(info.getTotal())
|
86
|
.setDate(info.getDate());
|
87
|
latch.countDown();
|
88
|
}
|
89
|
|
90
|
@Override
|
91
|
public void onFailure(final Throwable e) {
|
92
|
log.error(ExceptionUtils.getStackTrace(e));
|
93
|
errors.offer(e);
|
94
|
latch.countDown();
|
95
|
}
|
96
|
});
|
97
|
|
98
|
Futures.addCallback(
|
99
|
executor.submit(() -> setFunded(collectedFrom, indexClient)),
|
100
|
new FutureCallback<Long>() {
|
101
|
@Override
|
102
|
public void onSuccess(final Long numFound) {
|
103
|
indexRecordInfo.setFunded(numFound);
|
104
|
latch.countDown();
|
105
|
}
|
106
|
|
107
|
@Override
|
108
|
public void onFailure(final Throwable e) {
|
109
|
log.error(ExceptionUtils.getStackTrace(e));
|
110
|
errors.offer(e);
|
111
|
latch.countDown();
|
112
|
}
|
113
|
});
|
114
|
|
115
|
waitLatch(latch, errors, config.getRequestTimeout());
|
116
|
return indexRecordInfo;
|
117
|
} catch (final Throwable e) {
|
118
|
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying publications from: " + dsId, e);
|
119
|
}
|
120
|
}
|
121
|
|
122
|
private Long setFunded(
|
123
|
final String collectedFrom,
|
124
|
final SolrClient indexClient) throws ApiException {
|
125
|
|
126
|
final SolrQuery query = new SolrQuery(
|
127
|
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom))
|
128
|
.setRows(0);
|
129
|
|
130
|
log.debug(String.format("query: %s", query));
|
131
|
try {
|
132
|
return indexClient.query(query).getResults().getNumFound();
|
133
|
} catch (Throwable e) {
|
134
|
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
|
135
|
}
|
136
|
}
|
137
|
|
138
|
private IndexRecordsInfo setTotal(
|
139
|
final String collectedFrom,
|
140
|
final SolrClient indexClient) throws ApiException {
|
141
|
try {
|
142
|
final SolrQuery query = new SolrQuery(
|
143
|
String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom))
|
144
|
.setRows(1);
|
145
|
log.debug(String.format("query: %s", query));
|
146
|
|
147
|
final QueryResponse rsp = indexClient.query(query);
|
148
|
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
|
149
|
if (log.isDebugEnabled()) {
|
150
|
log.debug(String.format("got document %s", doc.toString()));
|
151
|
}
|
152
|
|
153
|
return new IndexRecordsInfo()
|
154
|
.setDate(getLastIndexingDate(indexClient))
|
155
|
.setTotal(rsp.getResults().getNumFound());
|
156
|
} catch (Throwable e) {
|
157
|
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
|
158
|
}
|
159
|
}
|
160
|
|
161
|
@Cacheable("index-cache")
|
162
|
public String getLastIndexingDate(final SolrClient indexClient) throws ApiException {
|
163
|
try {
|
164
|
final QueryResponse rsp = indexClient.query(new SolrQuery("oaftype:datasource").setRows(1));
|
165
|
if (rsp.getResults().getNumFound() > 0) {
|
166
|
final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
|
167
|
|
168
|
final String dsversion = doc.get(DSVERSION).toString();
|
169
|
return StringUtils.substringBefore(dsversion, "T");
|
170
|
} else {
|
171
|
final String defaultDate = getDefaultLastIndexingDate();
|
172
|
log.debug("unable to find documents, defaulting to " + defaultDate);
|
173
|
return defaultDate;
|
174
|
}
|
175
|
} catch (SolrServerException | IOException e) {
|
176
|
throw new ApiException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile", e);
|
177
|
}
|
178
|
}
|
179
|
|
180
|
private String getDefaultLastIndexingDate() {
|
181
|
final Calendar cal = Calendar.getInstance();
|
182
|
cal.add(Calendar.MONTH, -1);
|
183
|
return DateFormatUtils.format(cal.getTime(), DatasourceFunctions.DATE_FORMAT);
|
184
|
}
|
185
|
|
186
|
private synchronized SolrClient getIndexClient(final IndexDsInfo info) {
|
187
|
if (!indexClientMap.containsKey(info.getColl())) {
|
188
|
|
189
|
final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
|
190
|
client.setDefaultCollection(info.getColl());
|
191
|
|
192
|
indexClientMap.put(info.getColl(), client);
|
193
|
}
|
194
|
return indexClientMap.get(info.getColl());
|
195
|
}
|
196
|
|
197
|
private void waitLatch(final CountDownLatch latch, final Queue<Throwable> errors, final int waitSeconds) {
|
198
|
try {
|
199
|
if (!latch.await(waitSeconds, TimeUnit.SECONDS)) {
|
200
|
errors.offer(new TimeoutException("Waiting for requests to complete has timed out."));
|
201
|
}
|
202
|
} catch (final InterruptedException e) {
|
203
|
errors.offer(e);
|
204
|
}
|
205
|
}
|
206
|
|
207
|
}
|