Project

General

Profile

« Previous | Next » 

Revision 53802

branch for solr 7.5.0

View differences:

IndexFeedMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.index;
2 2

  
3
import java.io.ByteArrayOutputStream;
4
import java.io.IOException;
5
import java.nio.charset.StandardCharsets;
6
import java.util.List;
7
import java.util.Map.Entry;
8
import java.util.zip.GZIPOutputStream;
9

  
10 3
import com.google.common.collect.Lists;
11 4
import eu.dnetlib.data.mapreduce.JobParams;
12 5
import eu.dnetlib.data.proto.TypeProtos.Type;
13 6
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
14 7
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory;
8
import eu.dnetlib.functionality.index.utils.ZkServers;
15 9
import eu.dnetlib.miscutils.datetime.HumanTime;
16 10
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
17 11
import org.apache.commons.codec.binary.Base64;
......
22 16
import org.apache.hadoop.io.Text;
23 17
import org.apache.hadoop.mapreduce.Mapper;
24 18
import org.apache.solr.client.solrj.SolrServerException;
25
import org.apache.solr.client.solrj.impl.CloudSolrServer;
19
import org.apache.solr.client.solrj.impl.CloudSolrClient;
26 20
import org.apache.solr.client.solrj.response.SolrPingResponse;
27 21
import org.apache.solr.client.solrj.response.UpdateResponse;
28 22
import org.apache.solr.common.SolrInputDocument;
29 23

  
24
import java.io.ByteArrayOutputStream;
25
import java.io.IOException;
26
import java.nio.charset.StandardCharsets;
27
import java.util.List;
28
import java.util.Map.Entry;
29
import java.util.zip.GZIPOutputStream;
30

  
30 31
public class IndexFeedMapper extends Mapper<Text, Text, Text, Text> {
31 32

  
32 33
	private static final Log log = LogFactory.getLog(IndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
......
34 35

  
35 36
	private InputDocumentFactory documentFactory;
36 37

  
37
	private CloudSolrServer solrServer;
38
	private CloudSolrClient solrClient;
38 39

  
39 40
	private String version;
40 41

  
......
93 94
			try {
94 95
				count++;
95 96
				log.info("initializing solr server...");
96
				solrServer = new CloudSolrServer(baseURL);
97
				final ZkServers zk = ZkServers.newInstance(baseURL);
98
				solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot())
99
						.withParallelUpdates(true)
100
						.build();
97 101

  
98
				solrServer.connect();
102
				solrClient.connect();
103
				solrClient.setDefaultCollection(collection);
99 104

  
100
				solrServer.setParallelUpdates(true);
101
				solrServer.setDefaultCollection(collection);
105
				final SolrPingResponse rsp = solrClient.ping();
102 106

  
103
				final SolrPingResponse rsp = solrServer.ping();
104

  
105 107
				if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus());
106 108
				else {
107 109
					break;
108 110
				}
109 111

  
110 112
			} catch (final Throwable e) {
111
				if (solrServer != null) {
112
					solrServer.shutdown();
113
				if (solrClient != null) {
114
					solrClient.close();
113 115
				}
114 116
				context.getCounter("index init", e.getMessage()).increment(1);
115 117
				log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e)));
......
179 181
	private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException {
180 182
		if (!simulation) {
181 183
			final long start = System.currentTimeMillis();
182
			final UpdateResponse rsp = solrServer.add(buffer);
184
			final UpdateResponse rsp = solrClient.add(buffer);
183 185
			final long stop = System.currentTimeMillis() - start;
184 186
			log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n");
185 187

  
......
205 207
			}
206 208
			log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown");
207 209
			Thread.sleep(shutdownWaitTime);
208
			solrServer.shutdown();
210
			solrClient.close();
209 211
		} catch (final SolrServerException e) {
210 212
			log.error("couldn't shutdown server " + e.getMessage());
211 213
		}

Also available in: Unified diff