Project

General

Profile

1
package eu.dnetlib.functionality.index.client.solr;
2

    
3
import java.io.IOException;
4
import java.text.SimpleDateFormat;
5
import java.util.*;
6

    
7
import com.google.common.collect.BiMap;
8
import com.google.common.collect.Maps;
9
import eu.dnetlib.data.provision.index.rmi.BrowsingRow;
10
import eu.dnetlib.data.provision.index.rmi.GroupResult;
11
import eu.dnetlib.data.provision.index.rmi.IndexServiceException;
12
import eu.dnetlib.functionality.cql.CqlValueTransformerMap;
13
import eu.dnetlib.functionality.index.client.IndexClient;
14
import eu.dnetlib.functionality.index.client.IndexClientException;
15
import eu.dnetlib.functionality.index.client.response.BrowseEntry;
16
import eu.dnetlib.functionality.index.client.response.BrowseValueEntry;
17
import eu.dnetlib.functionality.index.client.response.LookupResponse;
18
import eu.dnetlib.functionality.index.model.Any.ValueType;
19
import eu.dnetlib.functionality.index.query.*;
20
import eu.dnetlib.functionality.index.solr.cql.SolrTypeBasedCqlValueTransformerMapFactory;
21
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
22
import eu.dnetlib.functionality.index.utils.MetadataReference;
23
import eu.dnetlib.functionality.index.utils.ZkServers;
24
import eu.dnetlib.miscutils.datetime.HumanTime;
25
import eu.dnetlib.miscutils.functional.UnaryFunction;
26
import org.apache.commons.lang3.StringUtils;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.apache.solr.client.solrj.SolrClient;
30
import org.apache.solr.client.solrj.SolrQuery;
31
import org.apache.solr.client.solrj.SolrServerException;
32
import org.apache.solr.client.solrj.impl.CloudSolrClient;
33
import org.apache.solr.client.solrj.request.LukeRequest;
34
import org.apache.solr.client.solrj.response.LukeResponse;
35
import org.apache.solr.client.solrj.response.LukeResponse.FieldInfo;
36
import org.apache.solr.client.solrj.response.QueryResponse;
37
import org.apache.solr.client.solrj.response.UpdateResponse;
38
import org.apache.solr.common.SolrInputDocument;
39

    
40
/**
41
 * The Class SolrIndexClient.
42
 */
43
public class SolrIndexClient implements IndexClient {
44

    
45
	private static final Log log = LogFactory.getLog(SolrIndexClient.class);
46

    
47
	private static final String INDEX_RECORD_RESULT_FIELD = "dnetResult";
48

    
49
	private static String ZK_ADDRESS = "address";
50

    
51
	/** The format. */
52
	private String format;
53

    
54
	/** The layout. */
55
	private String layout;
56

    
57
	/** The interpretation. */
58
	private String interpretation;
59

    
60
	protected Map<String, String> serviceProperties;
61

    
62
	/** The client. */
63
	private CloudSolrClient client;
64

    
65
	private SolrIndexQueryFactory solrIndexQueryFactory;
66

    
67
	/** The query response factory. */
68
	private QueryResponseFactory<QueryResponse> queryResponseFactory;
69

    
70
	private SolrTypeBasedCqlValueTransformerMapFactory tMapFactory;
71

    
72
	/**
73
	 * The Constructor.
74
	 *
75
	 * @param format
76
	 *            the format
77
	 * @param layout
78
	 *            the layout
79
	 * @param interpretation
80
	 *            the interpretation
81
	 * @param serviceProperties
82
	 *            the service properties
83
	 * @param tMapFactory
84
	 */
85
	public SolrIndexClient(final String format, final String layout, final String interpretation, final Map<String, String> serviceProperties,
86
			final SolrIndexQueryFactory indexQueryFactory, final QueryResponseFactory<QueryResponse> queryResponseFactory,
87
			final SolrTypeBasedCqlValueTransformerMapFactory tMapFactory) {
88
		this.format = format;
89
		this.layout = layout;
90
		this.interpretation = interpretation;
91
		this.serviceProperties = serviceProperties;
92
		this.solrIndexQueryFactory = indexQueryFactory;
93
		this.queryResponseFactory = queryResponseFactory;
94
		this.tMapFactory = tMapFactory;
95

    
96
		log.debug(String.format("Created a new instance of the index of type %s-%s-%s", format, layout, interpretation));
97
	}
98

    
99
	/**
100
	 * Do delete.
101
	 *
102
	 * @param query
103
	 *            the CQL query
104
	 * @return true, if do delete
105
	 * @throws IndexServiceException
106
	 *             the index service exception
107
	 */
108
	@Override
109
	public long delete(final String query) throws IndexClientException {
110
		try {
111
			log.debug("delete by query: " + query);
112
			MetadataReference mdRef = new MetadataReference(getFormat(), getLayout(), getInterpretation());
113
			SolrIndexQuery translatedQuery = (SolrIndexQuery) solrIndexQueryFactory.getIndexQuery(QueryLanguage.CQL, query, this, mdRef);
114
			String tquery = translatedQuery.getQuery();
115
			translatedQuery.setQueryLimit(0);
116

    
117
			SolrIndexQueryResponse rsp = new SolrIndexQueryResponse(client.query(translatedQuery));
118
			QueryResponseParser responseParser = queryResponseFactory.getQueryResponseParser(rsp, mdRef);
119
			long total = responseParser.getNumFound();
120
			client.deleteByQuery(tquery);
121
			client.commit();
122
			return total;
123
		} catch (Exception e) {
124
			throw new IndexClientException("unable to run delete by query: " + query, e);
125
		}
126
	}
127

    
128
	/**
129
	 * {@inheritDoc}
130
	 *
131
	 * @throws IndexClientException
132
	 *
133
	 * @see IndexClient#browse(String, List, int)
134
	 */
135
	@Override
136
	public List<BrowseEntry> browse(final String query, final List<String> browseFields, final int max) throws IndexClientException {
137
		MetadataReference mdRef = new MetadataReference(getFormat(), getLayout(), getInterpretation());
138
		SolrIndexQuery translatedQuery = buildBrowseQuery(query, browseFields, max, mdRef);
139
		return executeBrowseQuery(query, translatedQuery, mdRef, browseFields);
140
	}
141

    
142
	/**
143
	 * {@inheritDoc}
144
	 *
145
	 * @throws IndexClientException
146
	 *
147
	 * @see IndexClient#browse(String, List, int, List)
148
	 */
149
	@Override
150
	public List<BrowseEntry> browse(final String query, final List<String> browseFields, final int max, final List<String> filterQuery)
151
			throws IndexClientException {
152
		MetadataReference mdRef = new MetadataReference(getFormat(), getLayout(), getInterpretation());
153
		SolrIndexQuery translatedQuery = buildBrowseQuery(query, browseFields, max, mdRef);
154
		if (filterQuery != null) {
155
			log.debug("Filter Query:");
156
			for (String fq : filterQuery) {
157
				translatedQuery.addFilterQuery(fq);
158
				log.debug("- " + fq);
159
			}
160
		}
161
		return executeBrowseQuery(query, translatedQuery, mdRef, browseFields);
162

    
163
	}
164

    
165
	private SolrIndexQuery buildBrowseQuery(final String query, final List<String> browseFields, final int max, final MetadataReference mdRef)
166
			throws IndexClientException {
167
		log.debug("Browse request for the index collection for query:" + query);
168

    
169
		SolrIndexQuery translatedQuery = (SolrIndexQuery) solrIndexQueryFactory.getIndexQuery(QueryLanguage.CQL, query, this, mdRef);
170
		translatedQuery.setFacet(true);
171
		if (browseFields != null) {
172
			List<String> browsableFields = solrIndexQueryFactory.getBrowsableFields(browseFields, mdRef);
173
			log.debug("Browsing fields:");
174
			for (String field : browsableFields) {
175
				translatedQuery.addFacetField(field);
176
				log.debug("- " + field);
177

    
178
			}
179
			translatedQuery.setFacetLimit(max);
180
			log.debug("max number of browsing field :" + max);
181
		}
182
		return translatedQuery;
183
	}
184

    
185
	private List<BrowseEntry> executeBrowseQuery(final String originalQuery,
186
			final SolrIndexQuery query,
187
			final MetadataReference mdRef,
188
			final List<String> browseFields) throws IndexClientException {
189
		try {
190
			SolrIndexQueryResponse response = new SolrIndexQueryResponse(client.query(query));
191
			QueryResponseParser responseParser = queryResponseFactory.getQueryResponseParser(response, mdRef);
192
			List<BrowsingRow> results = responseParser.getBrowsingResults();
193
			List<BrowseEntry> out = convertBrowseEntry(browseFields, results, responseParser.getAliases());
194
			return out;
195
		} catch (SolrServerException | IOException e) {
196
			throw new IndexClientException("Error on executing a query " + originalQuery, e);
197
		}
198
	}
199

    
200
	/**
201
	 * Creates the connection.
202
	 *
203
	 * @return the string
204
	 * @throws IndexClientException
205
	 *             the index client exception
206
	 */
207
	private String getUrl() throws IndexClientException {
208
		String address = serviceProperties.get(ZK_ADDRESS);
209
		if (StringUtils.isBlank(address)) {
210
			throw new IndexClientException("Unable to load a solr client, missing zk address");
211
		}
212
		return address;
213
	}
214

    
215
	/**
216
	 * Gets the client.
217
	 *
218
	 * @return the client
219
	 * @throws IndexClientException
220
	 *             the index client exception
221
	 */
222
	public SolrClient getClient() throws IndexClientException {
223
		if (this.client == null) {
224
			String url = getUrl();
225
			log.debug("create new Client " + url);
226

    
227
			final ZkServers zk = ZkServers.newInstance(url);
228
			final CloudSolrClient client = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()).build();
229

    
230
			client.connect();
231
			client.setDefaultCollection(String.format("%s-%s-%s", getFormat(), getLayout(), getInterpretation()));
232
			try {
233
				client.ping();
234
			} catch (Exception e) {
235
				throw new IndexClientException("oops something went wrong", e);
236
			}
237
		}
238
		return client;
239
	}
240

    
241
	/**
242
	 * Sets the client.
243
	 *
244
	 * @param client
245
	 *            the client
246
	 */
247
	public void setClient(final CloudSolrClient client) {
248
		this.client = client;
249
	}
250

    
251
	@Override
252
	public LookupResponse lookup(final String query, final List<String> filterQuery, final int from, final int to) throws IndexClientException {
253
		log.debug("lookup request for the index collection for query:" + query);
254
		MetadataReference mdRef = new MetadataReference(getFormat(), getLayout(), getInterpretation());
255
		SolrIndexQuery translatedQuery = (SolrIndexQuery) solrIndexQueryFactory.getIndexQuery(QueryLanguage.CQL, query, this, mdRef);
256
		translatedQuery.setQueryOffset(from);
257
		translatedQuery.setQueryLimit(to - from + 1);
258
		if (filterQuery != null) {
259
			for (String fq : filterQuery) {
260
				translatedQuery.addFilterQuery(fq);
261
			}
262
		}
263

    
264
		try {
265
			SolrIndexQueryResponse response = new SolrIndexQueryResponse(client.query(translatedQuery));
266
			QueryResponseParser responseParser = queryResponseFactory.getQueryResponseParser(response, mdRef);
267

    
268
			return new LookupResponse(responseParser);
269
		} catch (SolrServerException | IOException e) {
270
			throw new IndexClientException("Error on executing a query " + query, e);
271
		}
272

    
273
	}
274

    
275
	@Override
276
	public CqlValueTransformerMap getCqlValueTransformerMap(final MetadataReference mdRef) throws IndexClientException {
277
		try {
278
			return tMapFactory.getIt(readFieldNamesAndTypes());
279
		} catch (Exception e) {
280
			throw new IndexClientException(e);
281
		}
282
	}
283

    
284
	@Override
285
	public IndexQueryFactory getIndexQueryFactory() {
286
		return solrIndexQueryFactory;
287
	}
288

    
289
	@Override
290
	public void close() throws IOException {
291
		log.debug("shutdown client: " + serviceProperties.get(ZK_ADDRESS));
292
		client.close();
293
	}
294

    
295
	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord) throws IndexClientException {
296
		return feed(record, indexDsId, toIndexRecord, true);
297
	}
298

    
299
	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord, final boolean commit)
300
			throws IndexClientException {
301
		try {
302
			final SolrInputDocument doc = prepareSolrDocument(record, indexDsId, toIndexRecord);
303
			if ((doc == null) || doc.isEmpty()) throw new IndexClientException("Invalid solr document");
304
			return feed(doc, commit);
305
		} catch (final Throwable e) {
306
			throw new IndexClientException("Error feeding document", e);
307
		}
308
	}
309

    
310
	public int feed(final SolrInputDocument document) throws IndexClientException {
311
		return feed(document, true);
312
	}
313

    
314
	public int feed(final List<SolrInputDocument> document) throws IndexClientException {
315
		try {
316
			final UpdateResponse res = client.add(document);
317
			log.debug("feed time for single records, elapsed time: " + HumanTime.exactly(res.getElapsedTime()));
318
			if (res.getStatus() != 0) { throw new IndexClientException("bad status: " + res.getStatus()); }
319
			return res.getStatus();
320
		} catch (final Throwable e) {
321
			throw new IndexClientException("Error feeding document", e);
322
		}
323
	}
324

    
325
	public int feed(final SolrInputDocument document, final boolean commit) throws IndexClientException {
326
		try {
327
			final UpdateResponse res = client.add(document);
328
			log.debug("feed time for single records, elapsed time: " + HumanTime.exactly(res.getElapsedTime()));
329
			if (res.getStatus() != 0) { throw new IndexClientException("bad status: " + res.getStatus()); }
330
			if (commit) {
331
				client.commit();
332
			}
333
			return res.getStatus();
334
		} catch (final Throwable e) {
335
			throw new IndexClientException("Error feeding document", e);
336
		}
337
	}
338

    
339
	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback) throws IndexClientException {
340
		feed(docs, callback, true);
341
	}
342

    
343
	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback, final boolean commit) throws IndexClientException {
344
		try {
345
			if (docs.isEmpty()) {
346
				log.debug("Empty list of documents. Calling callback, if needed.");
347
				if (callback != null) {
348
					callback.doAfterFeeding(null);
349
				}
350
				return;
351
			}
352
			final UpdateResponse res = client.add(docs);
353

    
354
			log.debug("feed time for " + docs.size() + " records, elapsed tipe: : " + HumanTime.exactly(res.getElapsedTime()));
355

    
356
			if (commit) {
357
				client.commit();
358
			}
359
			if (callback != null) {
360
				callback.doAfterFeeding(res);
361
			}
362
			if (res.getStatus() != 0) throw new IndexClientException("bad status: " + res.getStatus());
363
		} catch (final Throwable e) {
364
			throw new IndexClientException("Error feeding documents", e);
365
		}
366
	}
367

    
368
	public SolrInputDocument prepareSolrDocument(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord)
369
			throws IndexClientException {
370
		try {
371
			final StreamingInputDocumentFactory documentFactory = new StreamingInputDocumentFactory();
372

    
373
			final String version = (new SimpleDateFormat("yyyy-MM-dd\'T\'hh:mm:ss\'Z\'")).format(new Date());
374
			final String indexRecord = toIndexRecord.evaluate(record);
375

    
376
			if (log.isDebugEnabled()) {
377
				log.debug("***************************************\nSubmitting index record:\n" + indexRecord + "\n***************************************\n");
378
			}
379

    
380
			return documentFactory.parseDocument(version, indexRecord, indexDsId, INDEX_RECORD_RESULT_FIELD);
381
		} catch (final Throwable e) {
382
			throw new IndexClientException("Error creating solr document", e);
383
		}
384
	}
385

    
386
	public boolean isRecordIndexed(final String id) throws IndexClientException {
387
		final QueryResponse res = query("objidentifier:\"" + id + "\"", null);
388
		return res.getResults().size() > 0;
389
	}
390

    
391
	public int remove(final String id) throws IndexClientException {
392
		return remove(id, true);
393
	}
394

    
395
	public UpdateResponse commit() throws IndexClientException {
396
		try {
397
			return client.commit();
398
		} catch (SolrServerException | IOException e) {
399
			throw new IndexClientException(e);
400
		}
401
	}
402

    
403
	public int remove(final String id, final boolean commit) throws IndexClientException {
404
		try {
405
			final UpdateResponse res = client.deleteByQuery("objidentifier:\"" + id + "\"");
406
			if (commit) {
407
				client.commit();
408
			}
409
			return res.getResponse().size();
410
		} catch (final Throwable e) {
411
			throw new IndexClientException("Error removing documents", e);
412
		}
413
	}
414

    
415
	public int count(final String query) throws IndexClientException {
416
		final QueryResponse res = query(query, 0);
417
		return res.getResults().size();
418
	}
419

    
420
	public QueryResponse query(final String query, Integer rows) throws IndexClientException {
421
		try {
422
			final SolrQuery solrQuery = new SolrQuery();
423
			solrQuery.setQuery(query);
424
			if(rows != null && rows >= 0) {
425
				solrQuery.setRows(rows);
426
			}
427
			return client.query(solrQuery);
428
		} catch (final Throwable e) {
429
			throw new IndexClientException("Error searching documents", e);
430
		}
431
	}
432

    
433
	public UpdateResponse deleteByQuery(final String query) throws IndexClientException {
434
		try {
435
			return client.deleteByQuery(query);
436
		} catch (final Throwable e) {
437
			throw new IndexClientException("Error searching documents", e);
438
		}
439
	}
440

    
441
	public interface AfterFeedingCallback {
442

    
443
		void doAfterFeeding(final UpdateResponse response);
444
	}
445

    
446
	/**
447
	 * Gets the format.
448
	 *
449
	 * @return the format
450
	 */
451
	public String getFormat() {
452
		return format;
453
	}
454

    
455
	/**
456
	 * Sets the format.
457
	 *
458
	 * @param format
459
	 *            the format
460
	 */
461
	public void setFormat(final String format) {
462
		this.format = format;
463
	}
464

    
465
	/**
466
	 * Gets the layout.
467
	 *
468
	 * @return the layout
469
	 */
470
	public String getLayout() {
471
		return layout;
472
	}
473

    
474
	/**
475
	 * Sets the layout.
476
	 *
477
	 * @param layout
478
	 *            the layout
479
	 */
480
	public void setLayout(final String layout) {
481
		this.layout = layout;
482
	}
483

    
484
	/**
485
	 * Gets the interpretation.
486
	 *
487
	 * @return the interpretation
488
	 */
489
	public String getInterpretation() {
490
		return interpretation;
491
	}
492

    
493
	/**
494
	 * Sets the interpretation.
495
	 *
496
	 * @param interpretation
497
	 *            the interpretation
498
	 */
499
	public void setInterpretation(final String interpretation) {
500
		this.interpretation = interpretation;
501
	}
502

    
503
	public Map<String, String> getServiceProperties() {
504
		return serviceProperties;
505
	}
506

    
507
	public void setServiceProperties(final Map<String, String> serviceProperties) {
508
		this.serviceProperties = serviceProperties;
509
	}
510

    
511
	// HELPERS
512

    
513
	private List<BrowseEntry> convertBrowseEntry(final List<String> browseFields, final List<BrowsingRow> results, final BiMap<String, String> aliases) {
514

    
515
		Map<String, BrowseEntry> mapResult = new HashMap<String, BrowseEntry>();
516
		for (BrowsingRow row : results) {
517
			for (GroupResult groupResult : row.getGroupResult()) {
518
				String name = groupResult.getName();
519
				List<BrowseValueEntry> valuesEntry;
520
				BrowseEntry entry;
521
				if (mapResult.containsKey(name)) {
522
					entry = mapResult.get(name);
523
					valuesEntry = entry.getValues();
524
					if (valuesEntry == null) {
525
						valuesEntry = new ArrayList<BrowseValueEntry>();
526
						entry.setValues(valuesEntry);
527
					}
528

    
529
				} else {
530
					entry = new BrowseEntry();
531
					entry.setField(name);
532
					entry.setLabel(name);
533
					valuesEntry = new ArrayList<BrowseValueEntry>();
534
					entry.setValues(valuesEntry);
535
					mapResult.put(name, entry);
536
				}
537
				String value = groupResult.getValue();
538
				int count = groupResult.getCount();
539
				BrowseValueEntry entryValue = new BrowseValueEntry(value, count);
540
				valuesEntry.add(entryValue);
541
			}
542
		}
543
		List<BrowseEntry> out = new ArrayList<BrowseEntry>();
544
		for (String b : browseFields) {
545
			String inverse = null;
546
			if (aliases != null) {
547
				inverse = aliases.get(b) != null ? aliases.get(b) : aliases.inverse().get(b);
548
			}
549
			if (mapResult.containsKey(b)) {
550
				out.add(mapResult.get(b));
551
			} else if (mapResult.containsKey(inverse) == true) {
552
				BrowseEntry data = mapResult.get(inverse);
553
				data.setField(b);
554
				out.add(data);
555
			}
556
		}
557
		return out;
558
	}
559

    
560
	private Map<String, ValueType> readFieldNamesAndTypes() throws SolrServerException, IOException, IndexClientException {
561

    
562
		final LukeRequest request = new LukeRequest();
563
		request.setShowSchema(true);
564

    
565
		request.setNumTerms(0);
566
		final LukeResponse response = request.process(getClient());
567
		final Map<String, FieldInfo> fieldInfos = response.getFieldInfo();
568
		final Map<String, LukeResponse.FieldTypeInfo> fieldTypeInfos = response.getFieldTypeInfo();
569
		final Map<String, ValueType> result = Maps.newHashMap();
570
		for (FieldInfo fieldInfo : fieldInfos.values()) {
571
			LukeResponse.FieldTypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldInfo.getType());
572
			final String fieldName = fieldTypeInfo.getName().toLowerCase();
573
			final ValueType fieldType = resolveSolrTypeClassName(fieldName);
574
			result.put(fieldInfo.getName(), fieldType);
575
		}
576
		return result;
577
	}
578

    
579
	private ValueType resolveSolrTypeClassName(final String solrTypeName) {
580
		if (solrTypeName.contains("LongField")) {
581
			return ValueType.LONG;
582
		} else if (solrTypeName.contains("IntField")) {
583
			return ValueType.LONG;
584
		} else if (solrTypeName.contains("short")) {
585
			return ValueType.LONG;
586
		} else if (solrTypeName.contains("float")) {
587
			return ValueType.DOUBLE;
588
		} else if (solrTypeName.contains("double")) {
589
			return ValueType.DOUBLE;
590
		} else if (solrTypeName.contains("date")) {
591
			return ValueType.DATETIME;
592
		} else {
593
			return ValueType.STRING;
594
		}
595
	}
596

    
597
}
(1-1/2)