Project

General

Profile

« Previous | Next » 

Revision 53802

branch for solr 7.5.0

View differences:

DedupIndexFeedMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.index;
2 2

  
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

  
8 3
import com.google.common.collect.Lists;
9 4
import com.googlecode.protobuf.format.JsonFormat;
10 5
import eu.dnetlib.data.mapreduce.JobParams;
......
12 7
import eu.dnetlib.data.proto.OafProtos.Oaf;
13 8
import eu.dnetlib.data.transform.SolrProtoMapper;
14 9
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
10
import eu.dnetlib.functionality.index.utils.ZkServers;
15 11
import eu.dnetlib.miscutils.datetime.HumanTime;
16 12
import org.apache.commons.collections.MapUtils;
17 13
import org.apache.commons.logging.Log;
......
23 19
import org.apache.hadoop.hbase.util.Bytes;
24 20
import org.apache.hadoop.io.Text;
25 21
import org.apache.solr.client.solrj.SolrServerException;
26
import org.apache.solr.client.solrj.impl.CloudSolrServer;
22
import org.apache.solr.client.solrj.impl.CloudSolrClient;
27 23
import org.apache.solr.client.solrj.response.SolrPingResponse;
28 24
import org.apache.solr.client.solrj.response.UpdateResponse;
29 25
import org.apache.solr.common.SolrInputDocument;
30 26
import org.dom4j.DocumentException;
31 27

  
28
import java.io.IOException;
29
import java.util.List;
30
import java.util.Map;
31
import java.util.Map.Entry;
32

  
32 33
public class DedupIndexFeedMapper extends TableMapper<Text, Text> {
33 34

  
34 35
	private static final Log log = LogFactory.getLog(DedupIndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
35 36

  
36
	private CloudSolrServer solrServer;
37
	private CloudSolrClient solrClient;
37 38

  
38 39
	private String dsId;
39 40

  
......
97 98
		while (true) {
98 99
			try {
99 100
				log.info("initializing solr server...");
100
				solrServer = new CloudSolrServer(baseURL);
101 101

  
102
				solrServer.connect();
102
				final ZkServers zk = ZkServers.newInstance(baseURL);
103
				solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot())
104
						.withParallelUpdates(true)
105
						.build();
103 106

  
104
				solrServer.setParallelUpdates(true);
105
				solrServer.setDefaultCollection(collection);
107
				solrClient.connect();
108
				solrClient.setDefaultCollection(collection);
106 109

  
107
				final SolrPingResponse rsp = solrServer.ping();
110
				final SolrPingResponse rsp = solrClient.ping();
108 111

  
109 112
				if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus());
110 113
				else {
......
112 115
				}
113 116

  
114 117
			} catch (final Throwable e) {
115
				if (solrServer != null) {
116
					solrServer.shutdown();
118
				if (solrClient != null) {
119
					solrClient.close();
117 120
				}
118 121
				context.getCounter("index init", e.getMessage()).increment(1);
119 122
				log.info(String.format("failed to init solr client wait %dms", backoffTimeMs));
......
182 185
	private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException {
183 186
		if (!simulation) {
184 187
			final long start = System.currentTimeMillis();
185
			final UpdateResponse rsp = solrServer.add(buffer);
188
			final UpdateResponse rsp = solrClient.add(buffer);
186 189
			final long stop = System.currentTimeMillis() - start;
187 190
			log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n");
188 191

  
......
203 206
			}
204 207
			log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown");
205 208
			Thread.sleep(shutdownWaitTime);
206
			solrServer.shutdown();
209
			solrClient.close();
207 210
		} catch (final SolrServerException e) {
208 211
			System.err.println("couldn't shutdown server " + e.getMessage());
209 212
		}

Also available in: Unified diff