Project

General

Profile

1
package eu.dnetlib.data.index;
2

    
3
import java.io.Closeable;
4
import java.io.IOException;
5
import java.text.SimpleDateFormat;
6
import java.util.Date;
7
import java.util.List;
8

    
9
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
10
import eu.dnetlib.miscutils.datetime.HumanTime;
11
import eu.dnetlib.miscutils.functional.UnaryFunction;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.apache.solr.client.solrj.SolrQuery;
15
import org.apache.solr.client.solrj.SolrServerException;
16
import org.apache.solr.client.solrj.impl.CloudSolrServer;
17
import org.apache.solr.client.solrj.response.QueryResponse;
18
import org.apache.solr.client.solrj.response.UpdateResponse;
19
import org.apache.solr.common.SolrInputDocument;
20

    
21
/**
22
 * Created by michele on 11/11/15.
23
 */
24
public class CloudIndexClient implements Closeable {
25

    
26
	private static final Log log = LogFactory.getLog(CloudIndexClient.class);
27
	private static final String INDEX_RECORD_RESULT_FIELD = "dnetResult";
28

    
29
	private final CloudSolrServer solrClient;
30

    
31
	protected CloudIndexClient(final CloudSolrServer solrServer) {
32
		this.solrClient = solrServer;
33
	}
34

    
35
	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord) throws CloudIndexClientException {
36
		return feed(record, indexDsId, toIndexRecord, true);
37
	}
38

    
39
	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord, final boolean commit)
40
			throws CloudIndexClientException {
41
		try {
42
			final SolrInputDocument doc = prepareSolrDocument(record, indexDsId, toIndexRecord);
43
			if ((doc == null) || doc.isEmpty()) throw new CloudIndexClientException("Invalid solr document");
44
			return feed(doc, commit);
45
		} catch (final Throwable e) {
46
			throw new CloudIndexClientException("Error feeding document", e);
47
		}
48
	}
49

    
50
	public int feed(final SolrInputDocument document) throws CloudIndexClientException {
51
		return feed(document, true);
52
	}
53

    
54
	public int feed(final SolrInputDocument document, final boolean commit) throws CloudIndexClientException {
55
		try {
56
			final UpdateResponse res = solrClient.add(document);
57
			log.debug("feed time for single records, elapsed time: " + HumanTime.exactly(res.getElapsedTime()));
58
			if (res.getStatus() != 0) { throw new CloudIndexClientException("bad status: " + res.getStatus()); }
59
			if (commit) {
60
				solrClient.commit();
61
			}
62
			return res.getStatus();
63
		} catch (final Throwable e) {
64
			throw new CloudIndexClientException("Error feeding document", e);
65
		}
66
	}
67

    
68
	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback) throws CloudIndexClientException {
69
		feed(docs, callback, true);
70
	}
71

    
72
	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback, final boolean commit) throws CloudIndexClientException {
73
		try {
74
			if (docs.isEmpty()) {
75
				log.debug("Empty list of documents. Calling callback, if needed.");
76
				if (callback != null) {
77
					callback.doAfterFeeding(null);
78
				}
79
				return;
80
			}
81
			final UpdateResponse res = solrClient.add(docs);
82

    
83
			log.debug("feed time for " + docs.size() + " records, elapsed tipe: : " + HumanTime.exactly(res.getElapsedTime()));
84

    
85
			if (commit) {
86
				solrClient.commit();
87
			}
88
			if (callback != null) {
89
				callback.doAfterFeeding(res);
90
			}
91
			if (res.getStatus() != 0) throw new CloudIndexClientException("bad status: " + res.getStatus());
92
		} catch (final Throwable e) {
93
			throw new CloudIndexClientException("Error feeding documents", e);
94
		}
95
	}
96

    
97
	public SolrInputDocument prepareSolrDocument(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord)
98
			throws CloudIndexClientException {
99
		try {
100
			final StreamingInputDocumentFactory documentFactory = new StreamingInputDocumentFactory();
101

    
102
			final String version = (new SimpleDateFormat("yyyy-MM-dd\'T\'hh:mm:ss\'Z\'")).format(new Date());
103
			final String indexRecord = toIndexRecord.evaluate(record);
104

    
105
			if (log.isDebugEnabled()) {
106
				log.debug("***************************************\nSubmitting index record:\n" + indexRecord + "\n***************************************\n");
107
			}
108

    
109
			return documentFactory.parseDocument(version, indexRecord, indexDsId, INDEX_RECORD_RESULT_FIELD);
110
		} catch (final Throwable e) {
111
			throw new CloudIndexClientException("Error creating solr document", e);
112
		}
113
	}
114

    
115
	public boolean isRecordIndexed(final String id) throws CloudIndexClientException {
116
		final QueryResponse res = query("objidentifier:\"" + id + "\"", null);
117
		return res.getResults().size() > 0;
118
	}
119

    
120
	public int remove(final String id) throws CloudIndexClientException {
121
		return remove(id, true);
122
	}
123

    
124
	public int remove(final String id, final boolean commit) throws CloudIndexClientException {
125
		try {
126
			final UpdateResponse res = solrClient.deleteByQuery("objidentifier:\"" + id + "\"");
127
			if (commit) {
128
				solrClient.commit();
129
			}
130
			return res.getResponse().size();
131
		} catch (final Throwable e) {
132
			throw new CloudIndexClientException("Error removing documents", e);
133
		}
134
	}
135

    
136
	public int count(final String query) throws CloudIndexClientException {
137
		final QueryResponse res = query(query, 0);
138
		return res.getResults().size();
139
	}
140

    
141
	public QueryResponse query(final String query, Integer rows) throws CloudIndexClientException {
142
		try {
143
			final SolrQuery solrQuery = new SolrQuery();
144
			solrQuery.setQuery(query);
145
			if(rows != null && rows >= 0) {
146
				solrQuery.setRows(rows);
147
			}
148
			return solrClient.query(solrQuery);
149
		} catch (final Throwable e) {
150
			throw new CloudIndexClientException("Error searching documents", e);
151
		}
152
	}
153

    
154
	public void close() throws IOException {
155
		if (solrClient != null) {
156
			solrClient.shutdown();
157
		}
158
	}
159

    
160
	public void commit() throws CloudIndexClientException {
161
		if(solrClient != null) {
162
			try {
163
				solrClient.commit();
164
			} catch (SolrServerException | IOException e) {
165
				throw new CloudIndexClientException(e.getMessage());
166
			}
167
		}
168
	}
169

    
170
	public interface AfterFeedingCallback {
171

    
172
		void doAfterFeeding(final UpdateResponse response);
173
	}
174
}
(1-1/3)