Project

General

Profile

1
package eu.dnetlib.data.index;
2

    
3
import java.io.IOException;
4
import java.text.SimpleDateFormat;
5
import java.util.Date;
6
import java.util.List;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.solr.client.solrj.SolrQuery;
11
import org.apache.solr.client.solrj.impl.CloudSolrClient;
12
import org.apache.solr.client.solrj.response.QueryResponse;
13
import org.apache.solr.client.solrj.response.UpdateResponse;
14
import org.apache.solr.common.SolrInputDocument;
15

    
16
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
17
import eu.dnetlib.miscutils.datetime.HumanTime;
18
import eu.dnetlib.miscutils.functional.UnaryFunction;
19

    
20
/**
21
 * Created by michele on 11/11/15.
22
 */
23
public class CloudIndexClient {
24

    
25
	private static final Log log = LogFactory.getLog(CloudIndexClient.class);
26
	private static final String INDEX_RECORD_RESULT_FIELD = "dnetResult";
27

    
28
	private final CloudSolrClient solrServer;
29

    
30
	protected CloudIndexClient(final CloudSolrClient solrServer) {
31
		this.solrServer = solrServer;
32
	}
33

    
34
	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord) throws CloudIndexClientException {
35
		return feed(record, indexDsId, toIndexRecord, true);
36
	}
37

    
38
	public int feed(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord, final boolean commit)
39
			throws CloudIndexClientException {
40
		try {
41
			final SolrInputDocument doc = prepareSolrDocument(record, indexDsId, toIndexRecord);
42
			if ((doc == null) || doc.isEmpty()) throw new CloudIndexClientException("Invalid solr document");
43
			return feed(doc, commit);
44
		} catch (final Throwable e) {
45
			throw new CloudIndexClientException("Error feeding document", e);
46
		}
47
	}
48

    
49
	public int feed(final SolrInputDocument document) throws CloudIndexClientException {
50
		return feed(document, true);
51
	}
52

    
53
	public int feed(final SolrInputDocument document, final boolean commit) throws CloudIndexClientException {
54
		try {
55
			final UpdateResponse res = solrServer.add(document);
56
			log.debug("feed time for single records, elapsed time: " + HumanTime.exactly(res.getElapsedTime()));
57
			if (res.getStatus() != 0) { throw new CloudIndexClientException("bad status: " + res.getStatus()); }
58
			if (commit) {
59
				solrServer.commit();
60
			}
61
			return res.getStatus();
62
		} catch (final Throwable e) {
63
			throw new CloudIndexClientException("Error feeding document", e);
64
		}
65
	}
66

    
67
	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback) throws CloudIndexClientException {
68
		feed(docs, callback, true);
69
	}
70

    
71
	public void feed(final List<SolrInputDocument> docs, final AfterFeedingCallback callback, final boolean commit) throws CloudIndexClientException {
72
		try {
73
			if (docs.isEmpty()) {
74
				log.debug("Empty list of documents. Calling callback, if needed.");
75
				if (callback != null) {
76
					callback.doAfterFeeding(null);
77
				}
78
				return;
79
			}
80
			final UpdateResponse res = solrServer.add(docs);
81

    
82
			log.debug("feed time for " + docs.size() + " records, elapsed tipe: : " + HumanTime.exactly(res.getElapsedTime()));
83

    
84
			if (commit) {
85
				solrServer.commit();
86
			}
87
			if (callback != null) {
88
				callback.doAfterFeeding(res);
89
			}
90
			if (res.getStatus() != 0) throw new CloudIndexClientException("bad status: " + res.getStatus());
91
		} catch (final Throwable e) {
92
			throw new CloudIndexClientException("Error feeding documents", e);
93
		}
94
	}
95

    
96
	public SolrInputDocument prepareSolrDocument(final String record, final String indexDsId, final UnaryFunction<String, String> toIndexRecord)
97
			throws CloudIndexClientException {
98
		try {
99
			final StreamingInputDocumentFactory documentFactory = new StreamingInputDocumentFactory();
100

    
101
			final String version = (new SimpleDateFormat("yyyy-MM-dd\'T\'hh:mm:ss\'Z\'")).format(new Date());
102
			final String indexRecord = toIndexRecord.evaluate(record);
103

    
104
			if (log.isDebugEnabled()) {
105
				log.debug("***************************************\nSubmitting index record:\n" + indexRecord + "\n***************************************\n");
106
			}
107

    
108
			return documentFactory.parseDocument(version, indexRecord, indexDsId, INDEX_RECORD_RESULT_FIELD);
109
		} catch (final Throwable e) {
110
			throw new CloudIndexClientException("Error creating solr document", e);
111
		}
112
	}
113

    
114
	public boolean isRecordIndexed(final String id) throws CloudIndexClientException {
115
		final QueryResponse res = query("objidentifier:\"" + id + "\"", null);
116
		return res.getResults().size() > 0;
117
	}
118

    
119
	public int remove(final String id) throws CloudIndexClientException {
120
		return remove(id, true);
121
	}
122

    
123
	public int remove(final String id, final boolean commit) throws CloudIndexClientException {
124
		try {
125
			final UpdateResponse res = solrServer.deleteByQuery("objidentifier:\"" + id + "\"");
126
			if (commit) {
127
				solrServer.commit();
128
			}
129
			return res.getResponse().size();
130
		} catch (final Throwable e) {
131
			throw new CloudIndexClientException("Error removing documents", e);
132
		}
133
	}
134

    
135
	public int count(final String query) throws CloudIndexClientException {
136
		final QueryResponse res = query(query, 0);
137
		return res.getResults().size();
138
	}
139

    
140
	public QueryResponse query(final String query, Integer rows) throws CloudIndexClientException {
141
		try {
142
			final SolrQuery solrQuery = new SolrQuery();
143
			solrQuery.setQuery(query);
144
			if(rows != null && rows >= 0) {
145
				solrQuery.setRows(rows);
146
			}
147
			return solrServer.query(solrQuery);
148
		} catch (final Throwable e) {
149
			throw new CloudIndexClientException("Error searching documents", e);
150
		}
151
	}
152

    
153
	public void close() throws IOException {
154
		if (solrServer != null) {
155
			solrServer.close();
156
		}
157
	}
158

    
159
	public interface AfterFeedingCallback {
160

    
161
		void doAfterFeeding(final UpdateResponse response);
162
	}
163
}
(1-1/3)