Project

General

Profile

« Previous | Next » 

Revision 50143

huge update: swagger documentation, alignment of methods with the service trunk

View differences:

DatasourceIndexClient.java
5 5
import java.util.List;
6 6
import java.util.Map;
7 7
import java.util.Queue;
8
import java.util.concurrent.ConcurrentHashMap;
9
import java.util.concurrent.CountDownLatch;
10
import java.util.concurrent.ScheduledThreadPoolExecutor;
11
import java.util.concurrent.TimeUnit;
12
import java.util.concurrent.TimeoutException;
13

  
8
import java.util.concurrent.*;
14 9
import javax.annotation.PostConstruct;
15 10
import javax.annotation.PreDestroy;
16 11

  
12
import com.google.common.collect.Iterables;
13
import com.google.common.util.concurrent.*;
14
import eu.dnetlib.OpenaireExporterConfig;
15
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException;
16
import eu.dnetlib.miscutils.functional.hash.Hashing;
17
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
18
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
19
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
17 20
import org.apache.commons.lang.StringUtils;
18 21
import org.apache.commons.lang.time.DateFormatUtils;
19
import org.apache.commons.lang3.exception.ExceptionUtils;
20 22
import org.apache.commons.logging.Log;
21 23
import org.apache.commons.logging.LogFactory;
24
import org.apache.solr.client.solrj.SolrQuery;
25
import org.apache.solr.client.solrj.SolrServerException;
26
import org.apache.solr.client.solrj.impl.CloudSolrClient;
27
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
22 28
import org.apache.solr.client.solrj.response.QueryResponse;
23 29
import org.apache.solr.common.SolrDocument;
24 30
import org.springframework.beans.factory.annotation.Autowired;
25 31
import org.springframework.http.HttpStatus;
26 32
import org.springframework.stereotype.Component;
27 33

  
28
import com.google.common.collect.Iterables;
29
import com.google.common.util.concurrent.FutureCallback;
30
import com.google.common.util.concurrent.Futures;
31
import com.google.common.util.concurrent.ListeningExecutorService;
32
import com.google.common.util.concurrent.MoreExecutors;
33
import com.google.common.util.concurrent.ThreadFactoryBuilder;
34

  
35
import eu.dnetlib.OpenaireExporterConfig;
36
import eu.dnetlib.data.index.CloudIndexClient;
37
import eu.dnetlib.data.index.CloudIndexClientException;
38
import eu.dnetlib.data.index.CloudIndexClientFactory;
39
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException;
40
import eu.dnetlib.miscutils.functional.hash.Hashing;
41
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
42
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
43
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexRecordsInfo;
44

  
45 34
/**
46 35
 * Created by claudio on 20/10/2016.
47 36
 */
......
58 47

  
59 48
	private ListeningExecutorService executor;
60 49

  
61
	private static Map<String, CloudIndexClient> indexClientMap = new ConcurrentHashMap<>();
50
	private static Map<String, CloudSolrClient> indexClientMap = new ConcurrentHashMap<>();
62 51

  
63 52
	@PostConstruct
64 53
	public void init() {
......
81 70
	public IndexRecordsInfo getIndexInfo(final String dsId, final IndexDsInfo info, final Queue<Throwable> errors) throws DatasourceManagerException {
82 71
		try {
83 72
			final String collectedFrom = StringUtils.substringBefore(dsId, SEPARATOR) + SEPARATOR + Hashing.md5(StringUtils.substringAfter(dsId, SEPARATOR));
84
			final CloudIndexClient indexClient = getIndexClient(info);
73
			final CloudSolrClient indexClient = getIndexClient(info);
85 74
			final CountDownLatch latch = new CountDownLatch(2);
86 75
			final IndexRecordsInfo indexRecordInfo = new IndexRecordsInfo();
87 76

  
......
99 88

  
100 89
						@Override
101 90
						public void onFailure(final Throwable e) {
102
							log.error(ExceptionUtils.getStackTrace(e));
103 91
							errors.offer(e);
104 92
							latch.countDown();
105 93
						}
......
117 105

  
118 106
						@Override
119 107
						public void onFailure(final Throwable e) {
120
							log.error(ExceptionUtils.getStackTrace(e));
121 108
							errors.offer(e);
122 109
							latch.countDown();
123 110
						}
......
132 119

  
133 120
	private Long setFunded(
134 121
			final String collectedFrom,
135
			final CloudIndexClient indexClient) throws DatasourceManagerException {
122
			final CloudSolrClient indexClient) throws DatasourceManagerException {
136 123
		final String query =
137 124
				String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\" AND relprojectid:*", collectedFrom);
138 125
		log.debug(String.format("query: %s", query));
139 126
		try {
140
			return indexClient.query(query, 0).getResults().getNumFound();
127
			return indexClient.query(new SolrQuery(query).setRows(0)).getResults().getNumFound();
141 128
		} catch (Throwable e) {
142 129
			throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying information system", e);
143 130
		}
......
145 132

  
146 133
	private IndexRecordsInfo setDateAndTotal(
147 134
			final String collectedFrom,
148
			final CloudIndexClient indexClient) throws DatasourceManagerException {
135
			final CloudSolrClient indexClient) throws DatasourceManagerException {
149 136
		try {
150 137
			final String query = String.format("oaftype:result AND deletedbyinference:false AND collectedfromdatasourceid:\"%s\"", collectedFrom);
151 138
			log.debug(String.format("query: %s", query));
152 139

  
153
			final QueryResponse rsp = indexClient.query(query, 1);
140
			final QueryResponse rsp = indexClient.query(new SolrQuery(query).setRows(1));
154 141
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), new SolrDocument());
155 142
			if (log.isDebugEnabled()) {
156
				log.debug(String.format("got document %s", doc.toString()));
143
				log.debug(String.format("got document %s", doc.get("__indexrecordidentifier")));
157 144
			}
158 145
			// if (doc.isEmpty()) {
159 146
			// throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), String.format("cannot find document matching
......
178 165

  
179 166
	public String getLastIndexingDate(final IndexDsInfo info) throws DatasourceManagerException {
180 167
		try {
181
			final QueryResponse rsp = getIndexClient(info).query("oaftype:datasource", 1);
168
			final SolrQuery query = new SolrQuery("oaftype:datasource").setRows(1);
169
			final QueryResponse rsp = getIndexClient(info).query(query);
182 170
			final SolrDocument doc = Iterables.getFirst(rsp.getResults(), null);
183 171
			final String dsversion = doc.get("__dsversion").toString();
184 172
			return StringUtils.substringBefore(dsversion, "T");
185
		} catch (CloudIndexClientException e) {
173
		} catch (SolrServerException | IOException e) {
186 174
			throw new DatasourceManagerException(HttpStatus.INTERNAL_SERVER_ERROR.value(), "Error querying index DS profile: " + info, e);
187 175
		}
188 176
	}
189 177

  
190
	private synchronized CloudIndexClient getIndexClient(final IndexDsInfo info) throws CloudIndexClientException {
178
	private synchronized CloudSolrClient getIndexClient(final IndexDsInfo info) {
191 179
		if (!indexClientMap.containsKey(info.getColl())) {
192
			indexClientMap.put(info.getColl(), CloudIndexClientFactory.newIndexClient(info.getIndexBaseUrl(), info.getColl(), false));
180

  
181
			final CloudSolrClient client = new Builder().withZkHost(info.getIndexBaseUrl()).build();
182
			client.setDefaultCollection(info.getColl());
183

  
184
			indexClientMap.put(info.getColl(), client);
193 185
		}
194 186
		return indexClientMap.get(info.getColl());
195 187
	}

Also available in: Unified diff