Revision 53802
Added by Claudio Atzori over 5 years ago
IndexFeedMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.index; |
2 | 2 |
|
3 |
import java.io.ByteArrayOutputStream; |
|
4 |
import java.io.IOException; |
|
5 |
import java.nio.charset.StandardCharsets; |
|
6 |
import java.util.List; |
|
7 |
import java.util.Map.Entry; |
|
8 |
import java.util.zip.GZIPOutputStream; |
|
9 |
|
|
10 | 3 |
import com.google.common.collect.Lists; |
11 | 4 |
import eu.dnetlib.data.mapreduce.JobParams; |
12 | 5 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
13 | 6 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
14 | 7 |
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory; |
8 |
import eu.dnetlib.functionality.index.utils.ZkServers; |
|
15 | 9 |
import eu.dnetlib.miscutils.datetime.HumanTime; |
16 | 10 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
17 | 11 |
import org.apache.commons.codec.binary.Base64; |
... | ... | |
22 | 16 |
import org.apache.hadoop.io.Text; |
23 | 17 |
import org.apache.hadoop.mapreduce.Mapper; |
24 | 18 |
import org.apache.solr.client.solrj.SolrServerException; |
25 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
19 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
26 | 20 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
27 | 21 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
28 | 22 |
import org.apache.solr.common.SolrInputDocument; |
29 | 23 |
|
24 |
import java.io.ByteArrayOutputStream; |
|
25 |
import java.io.IOException; |
|
26 |
import java.nio.charset.StandardCharsets; |
|
27 |
import java.util.List; |
|
28 |
import java.util.Map.Entry; |
|
29 |
import java.util.zip.GZIPOutputStream; |
|
30 |
|
|
30 | 31 |
public class IndexFeedMapper extends Mapper<Text, Text, Text, Text> { |
31 | 32 |
|
32 | 33 |
private static final Log log = LogFactory.getLog(IndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
34 | 35 |
|
35 | 36 |
private InputDocumentFactory documentFactory; |
36 | 37 |
|
37 |
private CloudSolrServer solrServer;
|
|
38 |
private CloudSolrClient solrClient;
|
|
38 | 39 |
|
39 | 40 |
private String version; |
40 | 41 |
|
... | ... | |
93 | 94 |
try { |
94 | 95 |
count++; |
95 | 96 |
log.info("initializing solr server..."); |
96 |
solrServer = new CloudSolrServer(baseURL); |
|
97 |
final ZkServers zk = ZkServers.newInstance(baseURL); |
|
98 |
solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()) |
|
99 |
.withParallelUpdates(true) |
|
100 |
.build(); |
|
97 | 101 |
|
98 |
solrServer.connect(); |
|
102 |
solrClient.connect(); |
|
103 |
solrClient.setDefaultCollection(collection); |
|
99 | 104 |
|
100 |
solrServer.setParallelUpdates(true); |
|
101 |
solrServer.setDefaultCollection(collection); |
|
105 |
final SolrPingResponse rsp = solrClient.ping(); |
|
102 | 106 |
|
103 |
final SolrPingResponse rsp = solrServer.ping(); |
|
104 |
|
|
105 | 107 |
if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus()); |
106 | 108 |
else { |
107 | 109 |
break; |
108 | 110 |
} |
109 | 111 |
|
110 | 112 |
} catch (final Throwable e) { |
111 |
if (solrServer != null) {
|
|
112 |
solrServer.shutdown();
|
|
113 |
if (solrClient != null) {
|
|
114 |
solrClient.close();
|
|
113 | 115 |
} |
114 | 116 |
context.getCounter("index init", e.getMessage()).increment(1); |
115 | 117 |
log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e))); |
... | ... | |
179 | 181 |
private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException { |
180 | 182 |
if (!simulation) { |
181 | 183 |
final long start = System.currentTimeMillis(); |
182 |
final UpdateResponse rsp = solrServer.add(buffer);
|
|
184 |
final UpdateResponse rsp = solrClient.add(buffer);
|
|
183 | 185 |
final long stop = System.currentTimeMillis() - start; |
184 | 186 |
log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n"); |
185 | 187 |
|
... | ... | |
205 | 207 |
} |
206 | 208 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
207 | 209 |
Thread.sleep(shutdownWaitTime); |
208 |
solrServer.shutdown();
|
|
210 |
solrClient.close();
|
|
209 | 211 |
} catch (final SolrServerException e) { |
210 | 212 |
log.error("couldn't shutdown server " + e.getMessage()); |
211 | 213 |
} |
Also available in: Unified diff
branch for solr 7.5.0