Project

General

Profile

« Previous | Next » 

Revision 48892

upgraded solr version to 6.6.0

View differences:

IndexFeedMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.index;
2 2

  
3
import java.io.ByteArrayOutputStream;
3 4
import java.io.IOException;
5
import java.nio.charset.StandardCharsets;
4 6
import java.util.List;
5 7
import java.util.Map.Entry;
8
import java.util.zip.GZIPOutputStream;
6 9

  
10
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer;
11
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer.Mode;
7 12
import org.apache.commons.codec.binary.Base64;
8 13
import org.apache.commons.lang.exception.ExceptionUtils;
9 14
import org.apache.commons.logging.Log;
......
12 17
import org.apache.hadoop.io.Text;
13 18
import org.apache.hadoop.mapreduce.Mapper;
14 19
import org.apache.solr.client.solrj.SolrServerException;
15
import org.apache.solr.client.solrj.impl.CloudSolrServer;
20
import org.apache.solr.client.solrj.impl.CloudSolrClient;
16 21
import org.apache.solr.client.solrj.response.SolrPingResponse;
17 22
import org.apache.solr.client.solrj.response.UpdateResponse;
18 23
import org.apache.solr.common.SolrInputDocument;
......
32 37

  
33 38
	private InputDocumentFactory documentFactory;
34 39

  
35
	private CloudSolrServer solrServer;
40
	private CloudSolrClient solrServer;
36 41

  
37 42
	private String version;
38 43

  
......
87 92
			try {
88 93
				count++;
89 94
				log.info("initializing solr server...");
90
				solrServer = new CloudSolrServer(baseURL);
95
				solrServer = new CloudSolrClient.Builder()
96
						.withZkHost(baseURL)
97
						.build();
91 98

  
92 99
				solrServer.connect();
93 100

  
......
103 110

  
104 111
			} catch (final Throwable e) {
105 112
				if (solrServer != null) {
106
					solrServer.shutdown();
113
					solrServer.close();
107 114
				}
108 115
				context.getCounter("index init", e.getMessage()).increment(1);
109 116
				log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e)));
......
123 130

  
124 131
		try {
125 132
			indexRecord = dmfToRecord.evaluate(value.toString());
126
			doc = documentFactory.parseDocument(version, indexRecord, dsId, "dnetResult");
133
			doc = documentFactory.parseDocument(version, indexRecord, dsId, "dnetResult", new ResultTransformer(Mode.base64) {
134
				@Override
135
				public String apply(final String s) {
136

  
137
					return org.apache.solr.common.util.Base64.byteArrayToBase64(zip(s));
138
				}
139
			});
127 140
			if ((doc == null) || doc.isEmpty()) throw new EmptySolrDocumentException();
128 141

  
129 142
		} catch (final Throwable e) {
......
146 159
		}
147 160
	}
148 161

  
162
	public static byte[] zip(final String s) {
163
		if ((s == null) || (s.length() == 0)) {
164
			throw new IllegalArgumentException("Cannot zip null or empty string");
165
		}
166

  
167
		try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
168
			try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
169
				gzipOutputStream.write(s.getBytes(StandardCharsets.UTF_8));
170
			}
171
			return byteArrayOutputStream.toByteArray();
172
		} catch(IOException e) {
173
			throw new RuntimeException("Failed to zip content", e);
174
		}
175
	}
176

  
149 177
	private void addDocument(final Context context, final SolrInputDocument doc) throws SolrServerException, IOException, EmptySolrDocumentException {
150 178
		buffer.add(doc);
151 179
		if (buffer.size() >= bufferFlushThreshold) {
......
182 210
			}
183 211
			log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown");
184 212
			Thread.sleep(shutdownWaitTime);
185
			solrServer.shutdown();
213
			solrServer.close();
186 214
		} catch (final SolrServerException e) {
187 215
			log.error("couldn't shutdown server " + e.getMessage());
188 216
		}

Also available in: Unified diff