Project

General

Profile

1
package eu.dnetlib.functionality.index;
2

    
3
import java.io.StringReader;
4
import java.util.Map;
5

    
6
import eu.dnetlib.functionality.cql.CqlValueTransformerMap;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.solr.client.solrj.impl.CloudSolrServer;
10
import org.dom4j.Document;
11
import org.dom4j.DocumentException;
12
import org.dom4j.io.SAXReader;
13
import org.springframework.beans.factory.annotation.Autowired;
14
import org.springframework.beans.factory.annotation.Required;
15

    
16
import com.google.common.collect.Maps;
17

    
18
import eu.dnetlib.data.provision.index.rmi.IndexServiceException;
19

    
20
import eu.dnetlib.functionality.index.feed.DocumentMapperFactory;
21
import eu.dnetlib.functionality.index.model.Any.ValueType;
22
import eu.dnetlib.functionality.index.query.IndexQueryFactory;
23
import eu.dnetlib.functionality.index.query.SolrIndexQueryFactory;
24
import eu.dnetlib.functionality.index.query.SolrIndexQueryResponseFactory;
25
import eu.dnetlib.functionality.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
26
import eu.dnetlib.functionality.index.solr.feed.SolrDocumentMapperFactory;
27
import eu.dnetlib.functionality.index.utils.IndexConfigFactory;
28
import eu.dnetlib.functionality.index.utils.MetadataReference;
29
import eu.dnetlib.functionality.index.utils.RemoteSolrAdministrator;
30
import eu.dnetlib.functionality.index.utils.ZkUtils;
31

    
32
/**
33
 * The Class SolrIndexServerDAO.
34
 */
35
public class SolrIndexServerDAO extends AbstractBackendDescriptor implements IndexServerDAO {
36

    
37
	/**
38
	 * The log.
39
	 */
40
	private static final Log log = LogFactory.getLog(SolrIndexServerDAO.class); // NOPMD by marko on 11/24/08 5:02 PM
41

    
42
	/** The zk utils. */
43
	@Autowired
44
	private ZkUtils zkUtils;
45

    
46
	/** The query response factory. */
47
	@Autowired
48
	private SolrIndexQueryResponseFactory queryResponseFactory;
49

    
50
	@Autowired
51
	private SolrIndexQueryFactory solrIndexQueryFactory;
52

    
53
	/** The solr document mapper factory. */
54
	@Autowired
55
	private SolrDocumentMapperFactory solrDocumentMapperFactory;
56

    
57
	/** The solr administrator. */
58
	private RemoteSolrAdministrator solrAdministrator;
59

    
60
	/** The solr type based cql value transformer map. */
61
	@Autowired
62
	private SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;
63

    
64
	/**
65
	 * {@inheritDoc}
66
	 * 
67
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#createIndexCollection(eu.dnetlib.functionality.index.utils.MetadataReference,
68
	 *      java.lang.String)
69
	 */
70
	@Override
71
	public void createIndexCollection(final MetadataReference mdref, final String fields) throws IndexServiceException {
72
		CloudSolrServer server = null;
73
		try {
74

    
75
			server = getServer();
76
			server.connect();
77

    
78
			if (!solrAdministrator.indexCollectionExists(mdref.toString(), server)) {
79
				Map<String, String> params = Maps.newHashMap();
80

    
81
				final Map<String, String> p = getServiceProperties();
82
				params.put("numShards", p.get("numShards"));
83
				params.put("replicationFactor", p.get("replicationFactor"));
84

    
85
				for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
86
					params.put(param_Name.toString(), p.get(param_Name.toString()));
87
				}
88

    
89
				zkUtils.uploadZookeperConfig(server.getZkStateReader().getZkClient(), mdref.toString(), parse(fields), params, false);
90
				solrAdministrator.createSolrIndex(p.get("host"), p.get("port"), mdref.toString(), p.get("numShards"), p.get("replicationFactor"),
91
						mdref.toString(), p.get("maxShardsPerNode"));
92
			}
93
			server.getZkStateReader().close();
94

    
95
		} catch (Exception e) {
96
			log.error("Error on creating IndexCollection", e);
97
			throw new IndexServiceException("Error on creating IndexCollection", e);
98
		} finally {
99
			if (server != null) {
100
				server.shutdown();
101
			}
102
		}
103
	}
104

    
105
	@Override
106
	public void updateIndexCollection(final MetadataReference mdRef, final Document fields) throws IndexServiceException {
107
		CloudSolrServer server = null;
108
		try {
109
			server = getServer();
110
			server.connect();
111
			Map<String, String> params = Maps.newHashMap();
112

    
113
			params.put("numShards", getServiceProperties().get("numShards"));
114
			params.put("replicationFactor", getServiceProperties().get("replicationFactor"));
115

    
116
			for (IndexConfigFactory.CONFIG_PARAMS param_Name : IndexConfigFactory.CONFIG_PARAMS.values()) {
117
				params.put(param_Name.toString(), getServiceProperties().get(param_Name.toString()));
118
			}
119

    
120
			zkUtils.uploadZookeperConfig(server.getZkStateReader().getZkClient(), mdRef.toString(), fields, params, true);
121
			server.getZkStateReader().close();
122
			server.shutdown();
123
			solrAdministrator.reloadCollection(getServiceProperties().get("host"), getServiceProperties().get("port"), mdRef.toString());
124

    
125
		} catch (Exception e) {
126
			log.error("Error on updating IndexCollection", e);
127
			throw new IndexServiceException("Error on updating IndexCollection", e);
128
		} finally {
129
			if (server != null) {
130
				server.shutdown();
131
			}
132
		}
133
	}
134

    
135
	/**
136
	 * Parses the fields parameter.
137
	 * 
138
	 * @param fields
139
	 *            the fields
140
	 * @return the document
141
	 * @throws IndexServiceException
142
	 *             the index service exception
143
	 */
144
	private Document parse(final String fields) throws IndexServiceException {
145
		try {
146
			return new SAXReader().read(new StringReader(fields));
147
		} catch (DocumentException e) {
148
			throw new IndexServiceException("unable to parse fields: " + fields, e);
149
		}
150
	}
151

    
152
	/**
153
	 * {@inheritDoc}
154
	 * 
155
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getIndexCollection(eu.dnetlib.functionality.index.utils.MetadataReference)
156
	 */
157
	@Override
158
	public IndexCollection getIndexCollection(final MetadataReference mdref) throws IndexServiceException {
159
		CloudSolrServer newServer = getServer(mdref);
160
		return new SolrIndexCollection(newServer);
161
	}
162

    
163
	/**
164
	 * {@inheritDoc}
165
	 * 
166
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getSchema(MetadataReference)
167
	 */
168
	@Override
169
	public Map<String, ValueType> getSchema(final MetadataReference mdRef) throws IndexServiceException {
170
		CloudSolrServer server = getServer(mdRef);
171
		Map<String, ValueType> fields = solrAdministrator.getFieldNamesAndTypes(mdRef.toString(), server);
172
		server.shutdown();
173
		return fields;
174
	}
175

    
176
	/**
177
	 * {@inheritDoc}
178
	 * 
179
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getCqlValueTransformerMap(MetadataReference)
180
	 */
181
	@Override
182
	public CqlValueTransformerMap getCqlValueTransformerMap(final MetadataReference mdRef) throws IndexServiceException {
183
		return tMapFactory.getIt(getSchema(mdRef));
184
	}
185

    
186
	/**
187
	 * {@inheritDoc}
188
	 * 
189
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#getDocumentMapperFactory()
190
	 */
191
	@Override
192
	public DocumentMapperFactory getDocumentMapperFactory() throws IndexServiceException {
193
		return solrDocumentMapperFactory;
194
	}
195

    
196
	/**
197
	 * {@inheritDoc}
198
	 * 
199
	 * @see eu.dnetlib.functionality.index.IndexServerDAO#shutdown(MetadataReference)
200
	 */
201
	@Override
202
	public void shutdown(final MetadataReference mdRef) throws IndexServiceException {
203
		getServer(mdRef).shutdown();
204

    
205
	}
206

    
207
	/**
208
	 * Gets a server with the default collection set according to the given mdRef.
209
	 * 
210
	 * @param mdRef
211
	 *            the md ref
212
	 * @return a server instance
213
	 * @throws IndexServiceException
214
	 *             the index service exception
215
	 */
216
	private CloudSolrServer getServer(final MetadataReference mdRef) throws IndexServiceException {
217
		CloudSolrServer server = getServer();
218
		server.setDefaultCollection(mdRef.toString());
219
		return server;
220
	}
221

    
222
	/**
223
	 * Gets the server.
224
	 * 
225
	 * @return a server instance
226
	 */
227
	private CloudSolrServer getServer() {
228
		String address = getEndpoint().get(ADDRESS);
229
		log.info("connecting to address: " + address);
230
		return new CloudSolrServer(address);
231

    
232
	}
233

    
234
	/**
235
	 * Gets the solr administrator.
236
	 * 
237
	 * @return the solrAdministrator
238
	 */
239
	public RemoteSolrAdministrator getSolrAdministrator() {
240
		return solrAdministrator;
241
	}
242

    
243
	/**
244
	 * Sets the solr administrator.
245
	 * 
246
	 * @param solrAdministrator
247
	 *            the solrAdministrator to set
248
	 */
249
	@Required
250
	public void setSolrAdministrator(final RemoteSolrAdministrator solrAdministrator) {
251
		this.solrAdministrator = solrAdministrator;
252
	}
253

    
254
	@Override
255
	public IndexQueryFactory getIndexQueryFactory() {
256
		return solrIndexQueryFactory;
257
	}
258

    
259
}
(2-2/2)