Revision 48892
Added by Claudio Atzori over 6 years ago
IndexFeedMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.index; |
2 | 2 |
|
3 |
import java.io.ByteArrayOutputStream; |
|
3 | 4 |
import java.io.IOException; |
5 |
import java.nio.charset.StandardCharsets; |
|
4 | 6 |
import java.util.List; |
5 | 7 |
import java.util.Map.Entry; |
8 |
import java.util.zip.GZIPOutputStream; |
|
6 | 9 |
|
10 |
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer; |
|
11 |
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer.Mode; |
|
7 | 12 |
import org.apache.commons.codec.binary.Base64; |
8 | 13 |
import org.apache.commons.lang.exception.ExceptionUtils; |
9 | 14 |
import org.apache.commons.logging.Log; |
... | ... | |
12 | 17 |
import org.apache.hadoop.io.Text; |
13 | 18 |
import org.apache.hadoop.mapreduce.Mapper; |
14 | 19 |
import org.apache.solr.client.solrj.SolrServerException; |
15 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
20 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
16 | 21 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
17 | 22 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
18 | 23 |
import org.apache.solr.common.SolrInputDocument; |
... | ... | |
32 | 37 |
|
33 | 38 |
private InputDocumentFactory documentFactory; |
34 | 39 |
|
35 |
private CloudSolrServer solrServer;
|
|
40 |
private CloudSolrClient solrServer;
|
|
36 | 41 |
|
37 | 42 |
private String version; |
38 | 43 |
|
... | ... | |
87 | 92 |
try { |
88 | 93 |
count++; |
89 | 94 |
log.info("initializing solr server..."); |
90 |
solrServer = new CloudSolrServer(baseURL); |
|
95 |
solrServer = new CloudSolrClient.Builder() |
|
96 |
.withZkHost(baseURL) |
|
97 |
.build(); |
|
91 | 98 |
|
92 | 99 |
solrServer.connect(); |
93 | 100 |
|
... | ... | |
103 | 110 |
|
104 | 111 |
} catch (final Throwable e) { |
105 | 112 |
if (solrServer != null) { |
106 |
solrServer.shutdown();
|
|
113 |
solrServer.close();
|
|
107 | 114 |
} |
108 | 115 |
context.getCounter("index init", e.getMessage()).increment(1); |
109 | 116 |
log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e))); |
... | ... | |
123 | 130 |
|
124 | 131 |
try { |
125 | 132 |
indexRecord = dmfToRecord.evaluate(value.toString()); |
126 |
doc = documentFactory.parseDocument(version, indexRecord, dsId, "dnetResult"); |
|
133 |
doc = documentFactory.parseDocument(version, indexRecord, dsId, "dnetResult", new ResultTransformer(Mode.base64) { |
|
134 |
@Override |
|
135 |
public String apply(final String s) { |
|
136 |
|
|
137 |
return org.apache.solr.common.util.Base64.byteArrayToBase64(zip(s)); |
|
138 |
} |
|
139 |
}); |
|
127 | 140 |
if ((doc == null) || doc.isEmpty()) throw new EmptySolrDocumentException(); |
128 | 141 |
|
129 | 142 |
} catch (final Throwable e) { |
... | ... | |
146 | 159 |
} |
147 | 160 |
} |
148 | 161 |
|
162 |
public static byte[] zip(final String s) { |
|
163 |
if ((s == null) || (s.length() == 0)) { |
|
164 |
throw new IllegalArgumentException("Cannot zip null or empty string"); |
|
165 |
} |
|
166 |
|
|
167 |
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { |
|
168 |
try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { |
|
169 |
gzipOutputStream.write(s.getBytes(StandardCharsets.UTF_8)); |
|
170 |
} |
|
171 |
return byteArrayOutputStream.toByteArray(); |
|
172 |
} catch(IOException e) { |
|
173 |
throw new RuntimeException("Failed to zip content", e); |
|
174 |
} |
|
175 |
} |
|
176 |
|
|
149 | 177 |
private void addDocument(final Context context, final SolrInputDocument doc) throws SolrServerException, IOException, EmptySolrDocumentException { |
150 | 178 |
buffer.add(doc); |
151 | 179 |
if (buffer.size() >= bufferFlushThreshold) { |
... | ... | |
182 | 210 |
} |
183 | 211 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
184 | 212 |
Thread.sleep(shutdownWaitTime); |
185 |
solrServer.shutdown();
|
|
213 |
solrServer.close();
|
|
186 | 214 |
} catch (final SolrServerException e) { |
187 | 215 |
log.error("couldn't shutdown server " + e.getMessage()); |
188 | 216 |
} |
Also available in: Unified diff
upgraded solr version to 6.6.0