Project

General

Profile

1
package eu.dnetlib.functionality.index.solr.feed;
2

    
3
import java.io.IOException;
4
import java.net.MalformedURLException;
5
import java.net.URI;
6
import java.net.URL;
7
import java.util.Collection;
8
import java.util.Iterator;
9
import java.util.List;
10
import java.util.regex.Matcher;
11
import java.util.regex.Pattern;
12

    
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.apache.solr.client.solrj.SolrServer;
17
import org.apache.solr.client.solrj.SolrServerException;
18
import org.apache.solr.client.solrj.impl.CloudSolrServer;
19
import org.apache.solr.client.solrj.impl.HttpSolrServer;
20
import org.apache.solr.client.solrj.response.UpdateResponse;
21
import org.apache.solr.common.SolrInputDocument;
22

    
23
import com.google.common.base.Splitter;
24
import com.google.common.collect.Lists;
25
import com.google.common.hash.HashFunction;
26
import com.google.common.hash.Hashing;
27

    
28
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
29

    
30
public class SolrServerPool {
31

    
32
	/**
33
	 * logger.
34
	 */
35
	private static final Log log = LogFactory.getLog(SolrServerPool.class); // NOPMD by marko on 11/24/08 5:02 PM
36

    
37
	/*
38
	 * We run into problems when using the ConcurrentUpdateSolrServer, so we're sticking to the HttpSolrServer.
39
	 */
40
	private final List<HttpSolrServer> updateServerPool = Lists.newArrayList();
41

    
42
	private CloudSolrServer cloudServer = null;
43

    
44
	private final HashFunction hash = Hashing.murmur3_32();
45

    
46
	public SolrServerPool(final String updateUrlLocal, final String updateUrlList, final String zkHost, final String collection, final boolean localFeeding) {
47
		for (URL url : parseUrlListPattern(updateUrlLocal, updateUrlList, localFeeding)) {
48
			String collectionUrl = url + "/" + collection;
49
			log.info("creating new HttpSolrServer: " + collectionUrl);
50
			System.out.println("creating new HttpSolrServer: " + collectionUrl);
51
			updateServerPool.add(new HttpSolrServer(collectionUrl));
52
		}
53
		if (!StringUtils.isBlank(zkHost)) {
54
			log.info("creating new CloudSolrServer: " + zkHost);
55
			System.out.println("creating new CloudSolrServer: " + zkHost);
56
			cloudServer = new CloudSolrServer(zkHost);
57
			cloudServer.setDefaultCollection(collection);
58
		}
59
	}
60

    
61
	public UpdateResponse add(final SolrInputDocument doc) throws SolrServerException, IOException {
62
		return updateServerPool.get(hashPick(doc)).add(doc);
63
	}
64

    
65
	public UpdateResponse addAll(final Iterator<SolrInputDocument> docs) throws SolrServerException, IOException {
66
		if (updateServerPool.size() == 1) return updateServerPool.get(0).add(docs);
67
		int i = Integer.parseInt(DnetXsltFunctions.randomInt(updateServerPool.size()));
68
		return updateServerPool.get(i).add(docs);
69
	}
70

    
71
	public UpdateResponse addAll(final Collection<SolrInputDocument> docs) throws SolrServerException, IOException {
72
		if (updateServerPool.size() == 1) return updateServerPool.get(0).add(docs);
73
		int i = Integer.parseInt(DnetXsltFunctions.randomInt(updateServerPool.size()));
74
		return updateServerPool.get(i).add(docs);
75
	}
76

    
77
	public void deleteByQuery(final String query) throws SolrServerException, IOException {
78
		if (cloudServer != null) {
79
			cloudServer.deleteByQuery(query);
80
		} else {
81
			updateServerPool.get(0).deleteByQuery(query);
82
		}
83
	}
84

    
85
	public void commitAll() throws SolrServerException, IOException {
86
		if (cloudServer != null) {
87
			cloudServer.commit();
88
		} else {
89
			updateServerPool.get(0).commit();
90
		}
91
	}
92

    
93
	public void shutdownAll() throws SolrServerException {
94
		if (cloudServer != null) {
95
			cloudServer.shutdown();
96
		}
97
		for (SolrServer server : updateServerPool) {
98
			server.shutdown();
99
		}
100
	}
101

    
102
	// //////////////////
103

    
104
	private int hashPick(final SolrInputDocument doc) {
105
		final int hashCode = hash.hashBytes(doc.getFieldValue("__indexrecordidentifier").toString().getBytes()).asInt();
106
		return Math.abs(hashCode) % updateServerPool.size();
107
	}
108

    
109
	public List<URL> parseUrlListPattern(final String local, final String list, final boolean localFeeding) {
110
		final List<URL> res = Lists.newArrayList();
111
		try {
112
			if (localFeeding) {
113
				res.add(new URL(local));
114
			} else {
115
				Matcher matcher = Pattern.compile("(^.*)\\[(\\d+)\\.\\.(\\d+)\\](.*$)").matcher(list);
116
				if (matcher.matches()) {
117
					final String prefix = matcher.group(1);
118
					int lb = Integer.parseInt(matcher.group(2));
119
					int ub = Integer.parseInt(matcher.group(3));
120
					final String suffix = matcher.group(4);
121

    
122
					for (int i = lb; i <= ub; i++) {
123
						res.add(new URL(prefix + i + suffix));
124
					}
125
				}
126
			}
127
		} catch (MalformedURLException e) {
128
			throw new IllegalArgumentException("invalid url list: " + list, e);
129
		}
130

    
131
		log.info("parsed url(s): " + res);
132
		return res;
133
	}
134

    
135
	public List<URL> parseUrlList(final String list) throws MalformedURLException {
136
		final List<URL> res = Lists.newArrayList();
137
		for (final String url : Splitter.on(",").trimResults().split(list)) {
138
			res.add(URI.create(url).toURL());
139
		}
140
		return res;
141
	}
142

    
143
}
(2-2/3)