Revision 53802
Added by Claudio Atzori over 5 years ago
DedupIndexFeedMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.index; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Map.Entry; |
|
7 |
|
|
8 | 3 |
import com.google.common.collect.Lists; |
9 | 4 |
import com.googlecode.protobuf.format.JsonFormat; |
10 | 5 |
import eu.dnetlib.data.mapreduce.JobParams; |
... | ... | |
12 | 7 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
13 | 8 |
import eu.dnetlib.data.transform.SolrProtoMapper; |
14 | 9 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
10 |
import eu.dnetlib.functionality.index.utils.ZkServers; |
|
15 | 11 |
import eu.dnetlib.miscutils.datetime.HumanTime; |
16 | 12 |
import org.apache.commons.collections.MapUtils; |
17 | 13 |
import org.apache.commons.logging.Log; |
... | ... | |
23 | 19 |
import org.apache.hadoop.hbase.util.Bytes; |
24 | 20 |
import org.apache.hadoop.io.Text; |
25 | 21 |
import org.apache.solr.client.solrj.SolrServerException; |
26 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
22 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
27 | 23 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
28 | 24 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
29 | 25 |
import org.apache.solr.common.SolrInputDocument; |
30 | 26 |
import org.dom4j.DocumentException; |
31 | 27 |
|
28 |
import java.io.IOException; |
|
29 |
import java.util.List; |
|
30 |
import java.util.Map; |
|
31 |
import java.util.Map.Entry; |
|
32 |
|
|
32 | 33 |
public class DedupIndexFeedMapper extends TableMapper<Text, Text> { |
33 | 34 |
|
34 | 35 |
private static final Log log = LogFactory.getLog(DedupIndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
35 | 36 |
|
36 |
private CloudSolrServer solrServer;
|
|
37 |
private CloudSolrClient solrClient;
|
|
37 | 38 |
|
38 | 39 |
private String dsId; |
39 | 40 |
|
... | ... | |
97 | 98 |
while (true) { |
98 | 99 |
try { |
99 | 100 |
log.info("initializing solr server..."); |
100 |
solrServer = new CloudSolrServer(baseURL); |
|
101 | 101 |
|
102 |
solrServer.connect(); |
|
102 |
final ZkServers zk = ZkServers.newInstance(baseURL); |
|
103 |
solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()) |
|
104 |
.withParallelUpdates(true) |
|
105 |
.build(); |
|
103 | 106 |
|
104 |
solrServer.setParallelUpdates(true);
|
|
105 |
solrServer.setDefaultCollection(collection);
|
|
107 |
solrClient.connect();
|
|
108 |
solrClient.setDefaultCollection(collection);
|
|
106 | 109 |
|
107 |
final SolrPingResponse rsp = solrServer.ping();
|
|
110 |
final SolrPingResponse rsp = solrClient.ping();
|
|
108 | 111 |
|
109 | 112 |
if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus()); |
110 | 113 |
else { |
... | ... | |
112 | 115 |
} |
113 | 116 |
|
114 | 117 |
} catch (final Throwable e) { |
115 |
if (solrServer != null) {
|
|
116 |
solrServer.shutdown();
|
|
118 |
if (solrClient != null) {
|
|
119 |
solrClient.close();
|
|
117 | 120 |
} |
118 | 121 |
context.getCounter("index init", e.getMessage()).increment(1); |
119 | 122 |
log.info(String.format("failed to init solr client wait %dms", backoffTimeMs)); |
... | ... | |
182 | 185 |
private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException { |
183 | 186 |
if (!simulation) { |
184 | 187 |
final long start = System.currentTimeMillis(); |
185 |
final UpdateResponse rsp = solrServer.add(buffer);
|
|
188 |
final UpdateResponse rsp = solrClient.add(buffer);
|
|
186 | 189 |
final long stop = System.currentTimeMillis() - start; |
187 | 190 |
log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n"); |
188 | 191 |
|
... | ... | |
203 | 206 |
} |
204 | 207 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
205 | 208 |
Thread.sleep(shutdownWaitTime); |
206 |
solrServer.shutdown();
|
|
209 |
solrClient.close();
|
|
207 | 210 |
} catch (final SolrServerException e) { |
208 | 211 |
System.err.println("couldn't shutdown server " + e.getMessage()); |
209 | 212 |
} |
Also available in: Unified diff
branch for solr 7.5.0