Revision 53802
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/IndexFeedMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.index; |
2 | 2 |
|
3 |
import java.io.ByteArrayOutputStream; |
|
4 |
import java.io.IOException; |
|
5 |
import java.nio.charset.StandardCharsets; |
|
6 |
import java.util.List; |
|
7 |
import java.util.Map.Entry; |
|
8 |
import java.util.zip.GZIPOutputStream; |
|
9 |
|
|
10 | 3 |
import com.google.common.collect.Lists; |
11 | 4 |
import eu.dnetlib.data.mapreduce.JobParams; |
12 | 5 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
13 | 6 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
14 | 7 |
import eu.dnetlib.functionality.index.solr.feed.StreamingInputDocumentFactory; |
8 |
import eu.dnetlib.functionality.index.utils.ZkServers; |
|
15 | 9 |
import eu.dnetlib.miscutils.datetime.HumanTime; |
16 | 10 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
17 | 11 |
import org.apache.commons.codec.binary.Base64; |
... | ... | |
22 | 16 |
import org.apache.hadoop.io.Text; |
23 | 17 |
import org.apache.hadoop.mapreduce.Mapper; |
24 | 18 |
import org.apache.solr.client.solrj.SolrServerException; |
25 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
19 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
26 | 20 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
27 | 21 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
28 | 22 |
import org.apache.solr.common.SolrInputDocument; |
29 | 23 |
|
24 |
import java.io.ByteArrayOutputStream; |
|
25 |
import java.io.IOException; |
|
26 |
import java.nio.charset.StandardCharsets; |
|
27 |
import java.util.List; |
|
28 |
import java.util.Map.Entry; |
|
29 |
import java.util.zip.GZIPOutputStream; |
|
30 |
|
|
30 | 31 |
public class IndexFeedMapper extends Mapper<Text, Text, Text, Text> { |
31 | 32 |
|
32 | 33 |
private static final Log log = LogFactory.getLog(IndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
34 | 35 |
|
35 | 36 |
private InputDocumentFactory documentFactory; |
36 | 37 |
|
37 |
private CloudSolrServer solrServer;
|
|
38 |
private CloudSolrClient solrClient;
|
|
38 | 39 |
|
39 | 40 |
private String version; |
40 | 41 |
|
... | ... | |
93 | 94 |
try { |
94 | 95 |
count++; |
95 | 96 |
log.info("initializing solr server..."); |
96 |
solrServer = new CloudSolrServer(baseURL); |
|
97 |
final ZkServers zk = ZkServers.newInstance(baseURL); |
|
98 |
solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()) |
|
99 |
.withParallelUpdates(true) |
|
100 |
.build(); |
|
97 | 101 |
|
98 |
solrServer.connect(); |
|
102 |
solrClient.connect(); |
|
103 |
solrClient.setDefaultCollection(collection); |
|
99 | 104 |
|
100 |
solrServer.setParallelUpdates(true); |
|
101 |
solrServer.setDefaultCollection(collection); |
|
105 |
final SolrPingResponse rsp = solrClient.ping(); |
|
102 | 106 |
|
103 |
final SolrPingResponse rsp = solrServer.ping(); |
|
104 |
|
|
105 | 107 |
if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus()); |
106 | 108 |
else { |
107 | 109 |
break; |
108 | 110 |
} |
109 | 111 |
|
110 | 112 |
} catch (final Throwable e) { |
111 |
if (solrServer != null) {
|
|
112 |
solrServer.shutdown();
|
|
113 |
if (solrClient != null) {
|
|
114 |
solrClient.close();
|
|
113 | 115 |
} |
114 | 116 |
context.getCounter("index init", e.getMessage()).increment(1); |
115 | 117 |
log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e))); |
... | ... | |
179 | 181 |
private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException { |
180 | 182 |
if (!simulation) { |
181 | 183 |
final long start = System.currentTimeMillis(); |
182 |
final UpdateResponse rsp = solrServer.add(buffer);
|
|
184 |
final UpdateResponse rsp = solrClient.add(buffer);
|
|
183 | 185 |
final long stop = System.currentTimeMillis() - start; |
184 | 186 |
log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n"); |
185 | 187 |
|
... | ... | |
205 | 207 |
} |
206 | 208 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
207 | 209 |
Thread.sleep(shutdownWaitTime); |
208 |
solrServer.shutdown();
|
|
210 |
solrClient.close();
|
|
209 | 211 |
} catch (final SolrServerException e) { |
210 | 212 |
log.error("couldn't shutdown server " + e.getMessage()); |
211 | 213 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/DedupIndexFeedMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.index; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Map.Entry; |
|
7 |
|
|
8 | 3 |
import com.google.common.collect.Lists; |
9 | 4 |
import com.googlecode.protobuf.format.JsonFormat; |
10 | 5 |
import eu.dnetlib.data.mapreduce.JobParams; |
... | ... | |
12 | 7 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
13 | 8 |
import eu.dnetlib.data.transform.SolrProtoMapper; |
14 | 9 |
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory; |
10 |
import eu.dnetlib.functionality.index.utils.ZkServers; |
|
15 | 11 |
import eu.dnetlib.miscutils.datetime.HumanTime; |
16 | 12 |
import org.apache.commons.collections.MapUtils; |
17 | 13 |
import org.apache.commons.logging.Log; |
... | ... | |
23 | 19 |
import org.apache.hadoop.hbase.util.Bytes; |
24 | 20 |
import org.apache.hadoop.io.Text; |
25 | 21 |
import org.apache.solr.client.solrj.SolrServerException; |
26 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
22 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
27 | 23 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
28 | 24 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
29 | 25 |
import org.apache.solr.common.SolrInputDocument; |
30 | 26 |
import org.dom4j.DocumentException; |
31 | 27 |
|
28 |
import java.io.IOException; |
|
29 |
import java.util.List; |
|
30 |
import java.util.Map; |
|
31 |
import java.util.Map.Entry; |
|
32 |
|
|
32 | 33 |
public class DedupIndexFeedMapper extends TableMapper<Text, Text> { |
33 | 34 |
|
34 | 35 |
private static final Log log = LogFactory.getLog(DedupIndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
35 | 36 |
|
36 |
private CloudSolrServer solrServer;
|
|
37 |
private CloudSolrClient solrClient;
|
|
37 | 38 |
|
38 | 39 |
private String dsId; |
39 | 40 |
|
... | ... | |
97 | 98 |
while (true) { |
98 | 99 |
try { |
99 | 100 |
log.info("initializing solr server..."); |
100 |
solrServer = new CloudSolrServer(baseURL); |
|
101 | 101 |
|
102 |
solrServer.connect(); |
|
102 |
final ZkServers zk = ZkServers.newInstance(baseURL); |
|
103 |
solrClient = new CloudSolrClient.Builder(zk.getHosts(), zk.getChroot()) |
|
104 |
.withParallelUpdates(true) |
|
105 |
.build(); |
|
103 | 106 |
|
104 |
solrServer.setParallelUpdates(true);
|
|
105 |
solrServer.setDefaultCollection(collection);
|
|
107 |
solrClient.connect();
|
|
108 |
solrClient.setDefaultCollection(collection);
|
|
106 | 109 |
|
107 |
final SolrPingResponse rsp = solrServer.ping();
|
|
110 |
final SolrPingResponse rsp = solrClient.ping();
|
|
108 | 111 |
|
109 | 112 |
if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus()); |
110 | 113 |
else { |
... | ... | |
112 | 115 |
} |
113 | 116 |
|
114 | 117 |
} catch (final Throwable e) { |
115 |
if (solrServer != null) {
|
|
116 |
solrServer.shutdown();
|
|
118 |
if (solrClient != null) {
|
|
119 |
solrClient.close();
|
|
117 | 120 |
} |
118 | 121 |
context.getCounter("index init", e.getMessage()).increment(1); |
119 | 122 |
log.info(String.format("failed to init solr client wait %dms", backoffTimeMs)); |
... | ... | |
182 | 185 |
private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException { |
183 | 186 |
if (!simulation) { |
184 | 187 |
final long start = System.currentTimeMillis(); |
185 |
final UpdateResponse rsp = solrServer.add(buffer);
|
|
188 |
final UpdateResponse rsp = solrClient.add(buffer);
|
|
186 | 189 |
final long stop = System.currentTimeMillis() - start; |
187 | 190 |
log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n"); |
188 | 191 |
|
... | ... | |
203 | 206 |
} |
204 | 207 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
205 | 208 |
Thread.sleep(shutdownWaitTime); |
206 |
solrServer.shutdown();
|
|
209 |
solrClient.close();
|
|
207 | 210 |
} catch (final SolrServerException e) { |
208 | 211 |
System.err.println("couldn't shutdown server " + e.getMessage()); |
209 | 212 |
} |
modules/dnet-mapreduce-jobs/trunk/pom.xml | ||
---|---|---|
3 | 3 |
<parent> |
4 | 4 |
<groupId>eu.dnetlib</groupId> |
5 | 5 |
<artifactId>dnet45-parent</artifactId> |
6 |
<version>1.0.0</version> |
|
6 |
<version>1.0.0-SNAPSHOT</version>
|
|
7 | 7 |
<relativePath /> |
8 | 8 |
</parent> |
9 | 9 |
<modelVersion>4.0.0</modelVersion> |
10 | 10 |
<groupId>eu.dnetlib</groupId> |
11 | 11 |
<artifactId>dnet-mapreduce-jobs</artifactId> |
12 |
<version>1.1.11-MASTER-SNAPSHOT</version>
|
|
12 |
<version>1.1.11-solr75-SNAPSHOT</version>
|
|
13 | 13 |
<packaging>jar</packaging> |
14 | 14 |
<scm> |
15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-mapreduce-jobs/branches/master</developerConnection>
|
|
15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-mapreduce-jobs/trunk</developerConnection>
|
|
16 | 16 |
</scm> |
17 | 17 |
<build> |
18 | 18 |
<plugins> |
... | ... | |
195 | 195 |
<dependency> |
196 | 196 |
<groupId>eu.dnetlib</groupId> |
197 | 197 |
<artifactId>dnet-openaireplus-mapping-utils</artifactId> |
198 |
<version>[6.2.19]</version>
|
|
198 |
<version>[6.2.20-solr75-SNAPSHOT]</version>
|
|
199 | 199 |
</dependency> |
200 | 200 |
<dependency> |
201 | 201 |
<groupId>org.antlr</groupId> |
Also available in: Unified diff
branch for solr 7.5.0