Project

General

Profile

« Previous | Next » 

Revision 51005

using solrj 7.x client

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java
50 50
		outKey = new Text();
51 51
		ibw = new ImmutableBytesWritable();
52 52

  
53
		log.info("pace conf");
54
		log.info("entity type: " + dedupConf.getWf().getEntityType());
55
		log.info("clustering: " + dedupConf.getPace().getClustering());
56
		log.info("conditions: " + dedupConf.getPace().getConditions());
57
		log.info("fields: " + dedupConf.getPace().getModel());
58
		log.info("blacklists: " + blackListMap);
53
		//log.info("pace conf");
54
		//log.info("entity type: " + dedupConf.getWf().getEntityType());
55
		//log.info("clustering: " + dedupConf.getPace().getClustering());
56
		//log.info("conditions: " + dedupConf.getPace().getConditions());
57
		//log.info("fields: " + dedupConf.getPace().getModel());
58
		//log.info("blacklists: " + blackListMap);
59 59
		log.info("wf conf: " + dedupConf.toString());
60 60
	}
61 61

  
......
82 82
				// TODO: remove this hack - here because we want to dedup only publications and organizazions
83 83
				if (shouldDedup(entity)) {
84 84
					final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
85
					context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1);
85 86
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
86 87
				}
87 88
			}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/IndexFeedMapper.java
22 22
import org.apache.hadoop.io.Text;
23 23
import org.apache.hadoop.mapreduce.Mapper;
24 24
import org.apache.solr.client.solrj.SolrServerException;
25
import org.apache.solr.client.solrj.impl.CloudSolrServer;
25
import org.apache.solr.client.solrj.impl.CloudSolrClient;
26 26
import org.apache.solr.client.solrj.response.SolrPingResponse;
27 27
import org.apache.solr.client.solrj.response.UpdateResponse;
28 28
import org.apache.solr.common.SolrInputDocument;
......
34 34

  
35 35
	private InputDocumentFactory documentFactory;
36 36

  
37
	private CloudSolrServer solrServer;
37
	private CloudSolrClient solrClient;
38 38

  
39 39
	private String version;
40 40

  
......
93 93
			try {
94 94
				count++;
95 95
				log.info("initializing solr server...");
96
				solrServer = new CloudSolrServer(baseURL);
97 96

  
98
				solrServer.connect();
97
				solrClient = new CloudSolrClient.Builder().withZkHost(baseURL).build();
98
				solrClient.connect();
99 99

  
100
				solrServer.setParallelUpdates(true);
101
				solrServer.setDefaultCollection(collection);
100
				solrClient.setParallelUpdates(true);
101
				solrClient.setDefaultCollection(collection);
102 102

  
103
				final SolrPingResponse rsp = solrServer.ping();
103
				final SolrPingResponse rsp = solrClient.ping();
104 104

  
105 105
				if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus());
106 106
				else {
......
108 108
				}
109 109

  
110 110
			} catch (final Throwable e) {
111
				if (solrServer != null) {
112
					solrServer.shutdown();
111
				if (solrClient != null) {
112
					solrClient.close();
113 113
				}
114 114
				context.getCounter("index init", e.getMessage()).increment(1);
115 115
				log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e)));
......
179 179
	private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException {
180 180
		if (!simulation) {
181 181
			final long start = System.currentTimeMillis();
182
			final UpdateResponse rsp = solrServer.add(buffer);
182
			final UpdateResponse rsp = solrClient.add(buffer);
183 183
			final long stop = System.currentTimeMillis() - start;
184 184
			log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n");
185 185

  
......
205 205
			}
206 206
			log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown");
207 207
			Thread.sleep(shutdownWaitTime);
208
			solrServer.shutdown();
208
			solrClient.close();
209 209
		} catch (final SolrServerException e) {
210 210
			log.error("couldn't shutdown server " + e.getMessage());
211 211
		}
......
214 214
	private void handleError(final Text key, final Text value, final Context context, final String indexRecord, final SolrInputDocument doc, final Throwable e)
215 215
			throws IOException, InterruptedException {
216 216
		context.getCounter("index feed", e.getClass().getName()).increment(1);
217
		context.write(key, printRottenRecord(context.getTaskAttemptID().toString(), value, indexRecord, doc));
218
		// e.printStackTrace(System.err);
217
		context.write(key, printRottenRecord(context.getTaskAttemptID().toString(), value, indexRecord, doc, e));
219 218
	}
220 219

  
221
	private Text printRottenRecord(final String taskid, final Text value, final String indexRecord, final SolrInputDocument doc) {
222
		return new Text("\n**********************************\n" + "task: " + taskid + "\n"
223
				+ check("original", value.toString() + check("indexRecord", indexRecord) + check("solrDoc", doc)));
220
	private Text printRottenRecord(final String taskid, final Text value, final String indexRecord, final SolrInputDocument doc, final Throwable e) {
221
		return new Text("\n**********************************\n" + "task: " + taskid + "\n" +
222
				check("original", value.toString() +
223
				check("indexRecord", indexRecord) +
224
				check("solrDoc", doc)) +
225
				check("error", e));
224 226
	}
225 227

  
226 228
	private String check(final String label, final Object value) {
227
		if ((value != null) && !value.toString().isEmpty()) return "\n " + label + ":\n" + value + "\n";
229
		if ((value != null) && !value.toString().isEmpty()) {
230
			return "\n " + label + ":\n" + value + "\n";
231
		}
228 232
		return "\n";
229 233
	}
230 234

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/DedupIndexFeedMapper.java
23 23
import org.apache.hadoop.hbase.util.Bytes;
24 24
import org.apache.hadoop.io.Text;
25 25
import org.apache.solr.client.solrj.SolrServerException;
26
import org.apache.solr.client.solrj.impl.CloudSolrServer;
26
import org.apache.solr.client.solrj.impl.CloudSolrClient;
27 27
import org.apache.solr.client.solrj.response.SolrPingResponse;
28 28
import org.apache.solr.client.solrj.response.UpdateResponse;
29 29
import org.apache.solr.common.SolrInputDocument;
......
33 33

  
34 34
	private static final Log log = LogFactory.getLog(DedupIndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
35 35

  
36
	private CloudSolrServer solrServer;
36
	private CloudSolrClient solrClient;
37 37

  
38 38
	private String dsId;
39 39

  
......
97 97
		while (true) {
98 98
			try {
99 99
				log.info("initializing solr server...");
100
				solrServer = new CloudSolrServer(baseURL);
100
				solrClient = new CloudSolrClient.Builder().withZkHost(baseURL).build();
101 101

  
102
				solrServer.connect();
102
				solrClient.connect();
103 103

  
104
				solrServer.setParallelUpdates(true);
105
				solrServer.setDefaultCollection(collection);
104
				solrClient.setParallelUpdates(true);
105
				solrClient.setDefaultCollection(collection);
106 106

  
107
				final SolrPingResponse rsp = solrServer.ping();
107
				final SolrPingResponse rsp = solrClient.ping();
108 108

  
109 109
				if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus());
110 110
				else {
......
112 112
				}
113 113

  
114 114
			} catch (final Throwable e) {
115
				if (solrServer != null) {
116
					solrServer.shutdown();
115
				if (solrClient != null) {
116
					solrClient.close();
117 117
				}
118 118
				context.getCounter("index init", e.getMessage()).increment(1);
119 119
				log.info(String.format("failed to init solr client wait %dms", backoffTimeMs));
......
135 135
		}
136 136

  
137 137
		final Oaf oaf = Oaf.parseFrom(bMap.get(DedupUtils.BODY_B));
138
		if (oaf.getDataInfo().getInvisible()) {
139
			context.getCounter(entityType, "invisible");
140
			return;
141
		}
138 142

  
139 143
		try {
140 144
			doc = getDocument(oaf);
......
182 186
	private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException {
183 187
		if (!simulation) {
184 188
			final long start = System.currentTimeMillis();
185
			final UpdateResponse rsp = solrServer.add(buffer);
189
			final UpdateResponse rsp = solrClient.add(buffer);
186 190
			final long stop = System.currentTimeMillis() - start;
187 191
			log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n");
188 192

  
......
203 207
			}
204 208
			log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown");
205 209
			Thread.sleep(shutdownWaitTime);
206
			solrServer.shutdown();
210
			solrClient.close();
207 211
		} catch (final SolrServerException e) {
208 212
			System.err.println("couldn't shutdown server " + e.getMessage());
209 213
		}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Reducer;
7

  
8
public class ExportSimplifiedRecordsReducer extends Reducer<Text, Text, Text, Text> {
9

  
10
	private Text keyOut;
11

  
12
	@Override
13
	protected void setup(final Context context) throws IOException, InterruptedException {
14
		keyOut = new Text("");
15
	}
16

  
17
	@Override
18
	protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
19
		for(final Text v : values) {
20
			//keyOut.set(key.toString() + "@@@");
21
			context.write(keyOut, v);
22
		}
23
	}
24

  
25
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsMapper.java
37 37

  
38 38
		final String summary = recordSummarizer.evaluate(value.toString());
39 39
		if (StringUtils.isNotBlank(summary)) {
40
		    keyOut.set(StringUtils.substringAfter(key.toString(), "::"));
40 41
			valueOut.set(summary.replaceAll("\n","").replaceAll("\t",""));
41 42
			context.write(keyOut, valueOut);
42 43
		}

Also available in: Unified diff