Project

General

Profile

1
package eu.dnetlib.index.utils;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.http.HttpResponse;
9
import org.apache.http.client.HttpClient;
10
import org.apache.http.client.methods.HttpGet;
11
import org.apache.solr.client.solrj.SolrClient;
12
import org.apache.solr.client.solrj.SolrServerException;
13
import org.apache.solr.client.solrj.impl.CloudSolrClient;
14
import org.apache.solr.client.solrj.request.LukeRequest;
15
import org.apache.solr.client.solrj.response.LukeResponse;
16
import org.apache.solr.client.solrj.response.LukeResponse.FieldInfo;
17
import org.apache.solr.client.solrj.response.SolrPingResponse;
18
import org.apache.solr.common.cloud.ClusterState;
19
import org.apache.solr.common.cloud.DocCollection;
20
import org.apache.solr.common.cloud.Replica;
21
import org.apache.solr.common.cloud.SolrZkClient;
22
import org.apache.zookeeper.WatchedEvent;
23
import org.apache.zookeeper.Watcher;
24
import org.apache.zookeeper.data.Stat;
25
import org.springframework.beans.factory.annotation.Required;
26

    
27
import com.google.common.collect.Maps;
28
import com.google.gson.JsonElement;
29
import com.google.gson.JsonObject;
30
import com.google.gson.JsonParser;
31

    
32
import eu.dnetlib.clients.index.model.Any.ValueType;
33
import eu.dnetlib.rmi.provision.IndexServiceException;
34

    
35
public class RemoteSolrAdministrator {
36

    
37
	/**
38
	 * The log.
39
	 */
40
	private static final Log log = LogFactory.getLog(RemoteSolrAdministrator.class);
41

    
42
	/** The create url request. */
43
	private static String createURLRequest = "http://%s:%s/solr/admin/collections?action=CREATE&name=%s&numShards=%s&replicationFactor=%s&maxShardsPerNode=%s&collection.configName=%s";
44

    
45
	/** The create url request. */
46
	private static String reloadURLRequest = "http://%s:%s/solr/admin/collections?action=RELOAD&name=%s";
47

    
48
	/** The http client. */
49
	private HttpClient httpClient;
50

    
51
	protected Map<String, Map<String, ValueType>> cachedSchema;
52

    
53
	public RemoteSolrAdministrator() {
54
		this.cachedSchema = Maps.newHashMap();
55
	}
56

    
57
	public boolean createSolrIndex(final String host,
58
			final String port,
59
			final String collectionName,
60
			final String numShard,
61
			final String replicationFactor,
62
			final String maxShardsPerNode,
63
		final String collectionConfigName) throws IndexServiceException {
64

    
65
		final String uri = generateCreateIndexRequest(host, port, collectionName, numShard, replicationFactor, maxShardsPerNode, collectionConfigName);
66
		log.info(uri);
67
		HttpGet request = new HttpGet(uri);
68
		HttpResponse response;
69
		try {
70
			response = getHttpClient().execute(request);
71

    
72
		} catch (Exception e) {
73
			throw new IndexServiceException("Unable to send request to solr server", e);
74
		} finally {
75
			request.releaseConnection();
76
		}
77
		if (response.getStatusLine().getStatusCode() != 200)
78
			throw new IndexServiceException("Error on creating index the error code from solr is" + response.toString());
79
		return false;
80
	}
81

    
82
	public boolean indexCollectionExists(final String indexCollectionId, final CloudSolrClient client) {
83
log.debug("indexCollectionExists " + indexCollectionId + "   "  + client.toString());
84
		Watcher watcher = new Watcher() {
85

    
86
			@Override
87
			public void process(final WatchedEvent arg0) {}
88
		};
89
		try {
90
			// server.connect();
91
			SolrZkClient zkClient = client.getZkStateReader().getZkClient();
92
			if (!zkClient.isConnected()) {
93
				client.close();
94
				client.connect();
95
			}
96

    
97
			final ClusterState clusterState = client.getZkStateReader().getClusterState();
98
			log.debug("indexCollectionExists clusterState " + clusterState);
99
			final DocCollection collection = clusterState.getCollectionOrNull(indexCollectionId);
100
			log.debug("indexCollectionExists collection " + collection);
101
			if (collection!=null) {
102
				client.setDefaultCollection(indexCollectionId);
103
				client.connect();
104
				SolrPingResponse status = client.ping();
105
				log.debug("indexCollectionExists status.getResponseHeader() " +  status.getResponseHeader());
106
				return (Integer) status.getResponseHeader().get("status") == 0;
107
			} else {
108
				log.debug("indexCollectionExists jobject.has(indexCollectionId) FALSE " );
109
				return false;
110
			}
111

    
112
		} catch (Exception e) {
113
			log.error(e);
114
			return false;
115
		}
116
	}
117

    
118
	private ValueType resolveSolrTypeClassName(final String solrTypeName) {
119
		if (solrTypeName.contains("LongField")) return ValueType.LONG;
120
		else if (solrTypeName.contains("IntField")) return ValueType.LONG;
121
		else if (solrTypeName.contains("short")) return ValueType.LONG;
122
		else if (solrTypeName.contains("float")) return ValueType.DOUBLE;
123
		else if (solrTypeName.contains("double")) return ValueType.DOUBLE;
124
		else if (solrTypeName.contains("date")) return ValueType.DATETIME;
125
		else return ValueType.STRING;
126
	}
127

    
128
	public Map<String, ValueType> getFieldNamesAndTypes(final String coreName, final SolrClient client) throws IndexServiceException {
129
		try {
130
			if (cachedSchema.get(coreName) == null) {
131
				synchronized (cachedSchema) {
132
					Map<String, ValueType> schema = readFieldNamesAndTypes(coreName, client);
133
					log.info("setting cache for schema of collection: " + coreName);
134
					cachedSchema.put(coreName, schema);
135
				}
136
			}
137
			return cachedSchema.get(coreName);
138
		} catch (Exception e) {
139
			throw new IndexServiceException("Unable to get Schema for " + coreName + " exception", e);
140
		}
141
	}
142

    
143
	private Map<String, ValueType> readFieldNamesAndTypes(final String coreName, final SolrClient client) throws SolrServerException, IOException {
144
		final LukeRequest request = new LukeRequest();
145
		request.setShowSchema(true);
146
		request.setNumTerms(0);
147
		final LukeResponse response = request.process(client);
148
		final Map<String, FieldInfo> fieldInfos = response.getFieldInfo();
149
		final Map<String, LukeResponse.FieldTypeInfo> fieldTypeInfos = response.getFieldTypeInfo();
150
		final Map<String, ValueType> result = Maps.newHashMap();
151
		for (FieldInfo fieldInfo : fieldInfos.values()) {
152
			LukeResponse.FieldTypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldInfo.getType());
153
			final String fieldName = fieldTypeInfo.getName().toLowerCase();
154
			final ValueType fieldType = resolveSolrTypeClassName(fieldName);
155
			result.put(fieldInfo.getName(), fieldType);
156
		}
157
		return result;
158
	}
159

    
160
	private String generateCreateIndexRequest(final String host,
161
			final String port,
162
			final String collectionName,
163
			final String numShard,
164
			final String replicationFactor,
165
			final String collectionConfigName,
166
			final String maxShardsPerNode) {
167
		return String.format(createURLRequest, host, port, collectionName, numShard, replicationFactor, maxShardsPerNode, collectionConfigName);
168
	}
169

    
170
	private String generateUpdateIndexRequest(final String host, final String port, final String collectionName) {
171
		return String.format(reloadURLRequest, host, port, collectionName);
172
	}
173

    
174
	/**
175
	 * @return the httpClient
176
	 */
177
	public HttpClient getHttpClient() {
178
		return httpClient;
179
	}
180

    
181
	/**
182
	 * @param httpClient
183
	 *            the httpClient to set
184
	 */
185
	@Required
186
	public void setHttpClient(final HttpClient httpClient) {
187
		log.info("setting http client " + httpClient.getClass());
188
		this.httpClient = httpClient;
189
	}
190

    
191
	public void reloadCollection(final String host, final String port, final String collectionName) throws IndexServiceException {
192
		log.info("creating the request of reload index " + generateUpdateIndexRequest(host, port, collectionName));
193
		HttpGet request = new HttpGet(generateUpdateIndexRequest(host, port, collectionName));
194
		HttpResponse response;
195
		try {
196
			response = httpClient.execute(request);
197
		} catch (Exception e) {
198
			throw new IndexServiceException("Unable to send request to solr server", e);
199
		} finally {
200
			request.releaseConnection();
201
		}
202
		if (response.getStatusLine().getStatusCode() != 200)
203
			throw new IndexServiceException("Error on reloading index the error code from solr is" + response.toString());
204

    
205
	}
206

    
207
}
(6-6/9)