Revision 51005
Added by Claudio Atzori over 6 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java | ||
---|---|---|
50 | 50 |
outKey = new Text(); |
51 | 51 |
ibw = new ImmutableBytesWritable(); |
52 | 52 |
|
53 |
log.info("pace conf"); |
|
54 |
log.info("entity type: " + dedupConf.getWf().getEntityType()); |
|
55 |
log.info("clustering: " + dedupConf.getPace().getClustering()); |
|
56 |
log.info("conditions: " + dedupConf.getPace().getConditions()); |
|
57 |
log.info("fields: " + dedupConf.getPace().getModel()); |
|
58 |
log.info("blacklists: " + blackListMap); |
|
53 |
//log.info("pace conf");
|
|
54 |
//log.info("entity type: " + dedupConf.getWf().getEntityType());
|
|
55 |
//log.info("clustering: " + dedupConf.getPace().getClustering());
|
|
56 |
//log.info("conditions: " + dedupConf.getPace().getConditions());
|
|
57 |
//log.info("fields: " + dedupConf.getPace().getModel());
|
|
58 |
//log.info("blacklists: " + blackListMap);
|
|
59 | 59 |
log.info("wf conf: " + dedupConf.toString()); |
60 | 60 |
} |
61 | 61 |
|
... | ... | |
82 | 82 |
// TODO: remove this hack - here because we want to dedup only publications and organizazions |
83 | 83 |
if (shouldDedup(entity)) { |
84 | 84 |
final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel()); |
85 |
context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1); |
|
85 | 86 |
emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap)); |
86 | 87 |
} |
87 | 88 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/IndexFeedMapper.java | ||
---|---|---|
22 | 22 |
import org.apache.hadoop.io.Text; |
23 | 23 |
import org.apache.hadoop.mapreduce.Mapper; |
24 | 24 |
import org.apache.solr.client.solrj.SolrServerException; |
25 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
25 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
26 | 26 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
27 | 27 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
28 | 28 |
import org.apache.solr.common.SolrInputDocument; |
... | ... | |
34 | 34 |
|
35 | 35 |
private InputDocumentFactory documentFactory; |
36 | 36 |
|
37 |
private CloudSolrServer solrServer;
|
|
37 |
private CloudSolrClient solrClient;
|
|
38 | 38 |
|
39 | 39 |
private String version; |
40 | 40 |
|
... | ... | |
93 | 93 |
try { |
94 | 94 |
count++; |
95 | 95 |
log.info("initializing solr server..."); |
96 |
solrServer = new CloudSolrServer(baseURL); |
|
97 | 96 |
|
98 |
solrServer.connect(); |
|
97 |
solrClient = new CloudSolrClient.Builder().withZkHost(baseURL).build(); |
|
98 |
solrClient.connect(); |
|
99 | 99 |
|
100 |
solrServer.setParallelUpdates(true);
|
|
101 |
solrServer.setDefaultCollection(collection);
|
|
100 |
solrClient.setParallelUpdates(true);
|
|
101 |
solrClient.setDefaultCollection(collection);
|
|
102 | 102 |
|
103 |
final SolrPingResponse rsp = solrServer.ping();
|
|
103 |
final SolrPingResponse rsp = solrClient.ping();
|
|
104 | 104 |
|
105 | 105 |
if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus()); |
106 | 106 |
else { |
... | ... | |
108 | 108 |
} |
109 | 109 |
|
110 | 110 |
} catch (final Throwable e) { |
111 |
if (solrServer != null) {
|
|
112 |
solrServer.shutdown();
|
|
111 |
if (solrClient != null) {
|
|
112 |
solrClient.close();
|
|
113 | 113 |
} |
114 | 114 |
context.getCounter("index init", e.getMessage()).increment(1); |
115 | 115 |
log.error(String.format("failed to init solr client wait %dms, error:\n%s", backoffTimeMs, ExceptionUtils.getStackTrace(e))); |
... | ... | |
179 | 179 |
private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException { |
180 | 180 |
if (!simulation) { |
181 | 181 |
final long start = System.currentTimeMillis(); |
182 |
final UpdateResponse rsp = solrServer.add(buffer);
|
|
182 |
final UpdateResponse rsp = solrClient.add(buffer);
|
|
183 | 183 |
final long stop = System.currentTimeMillis() - start; |
184 | 184 |
log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n"); |
185 | 185 |
|
... | ... | |
205 | 205 |
} |
206 | 206 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
207 | 207 |
Thread.sleep(shutdownWaitTime); |
208 |
solrServer.shutdown();
|
|
208 |
solrClient.close();
|
|
209 | 209 |
} catch (final SolrServerException e) { |
210 | 210 |
log.error("couldn't shutdown server " + e.getMessage()); |
211 | 211 |
} |
... | ... | |
214 | 214 |
private void handleError(final Text key, final Text value, final Context context, final String indexRecord, final SolrInputDocument doc, final Throwable e) |
215 | 215 |
throws IOException, InterruptedException { |
216 | 216 |
context.getCounter("index feed", e.getClass().getName()).increment(1); |
217 |
context.write(key, printRottenRecord(context.getTaskAttemptID().toString(), value, indexRecord, doc)); |
|
218 |
// e.printStackTrace(System.err); |
|
217 |
context.write(key, printRottenRecord(context.getTaskAttemptID().toString(), value, indexRecord, doc, e)); |
|
219 | 218 |
} |
220 | 219 |
|
221 |
private Text printRottenRecord(final String taskid, final Text value, final String indexRecord, final SolrInputDocument doc) { |
|
222 |
return new Text("\n**********************************\n" + "task: " + taskid + "\n" |
|
223 |
+ check("original", value.toString() + check("indexRecord", indexRecord) + check("solrDoc", doc))); |
|
220 |
private Text printRottenRecord(final String taskid, final Text value, final String indexRecord, final SolrInputDocument doc, final Throwable e) { |
|
221 |
return new Text("\n**********************************\n" + "task: " + taskid + "\n" + |
|
222 |
check("original", value.toString() + |
|
223 |
check("indexRecord", indexRecord) + |
|
224 |
check("solrDoc", doc)) + |
|
225 |
check("error", e)); |
|
224 | 226 |
} |
225 | 227 |
|
226 | 228 |
private String check(final String label, final Object value) { |
227 |
if ((value != null) && !value.toString().isEmpty()) return "\n " + label + ":\n" + value + "\n"; |
|
229 |
if ((value != null) && !value.toString().isEmpty()) { |
|
230 |
return "\n " + label + ":\n" + value + "\n"; |
|
231 |
} |
|
228 | 232 |
return "\n"; |
229 | 233 |
} |
230 | 234 |
|
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/DedupIndexFeedMapper.java | ||
---|---|---|
23 | 23 |
import org.apache.hadoop.hbase.util.Bytes; |
24 | 24 |
import org.apache.hadoop.io.Text; |
25 | 25 |
import org.apache.solr.client.solrj.SolrServerException; |
26 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
26 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
27 | 27 |
import org.apache.solr.client.solrj.response.SolrPingResponse; |
28 | 28 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
29 | 29 |
import org.apache.solr.common.SolrInputDocument; |
... | ... | |
33 | 33 |
|
34 | 34 |
private static final Log log = LogFactory.getLog(DedupIndexFeedMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
35 | 35 |
|
36 |
private CloudSolrServer solrServer;
|
|
36 |
private CloudSolrClient solrClient;
|
|
37 | 37 |
|
38 | 38 |
private String dsId; |
39 | 39 |
|
... | ... | |
97 | 97 |
while (true) { |
98 | 98 |
try { |
99 | 99 |
log.info("initializing solr server..."); |
100 |
solrServer = new CloudSolrServer(baseURL);
|
|
100 |
solrClient = new CloudSolrClient.Builder().withZkHost(baseURL).build();
|
|
101 | 101 |
|
102 |
solrServer.connect();
|
|
102 |
solrClient.connect();
|
|
103 | 103 |
|
104 |
solrServer.setParallelUpdates(true);
|
|
105 |
solrServer.setDefaultCollection(collection);
|
|
104 |
solrClient.setParallelUpdates(true);
|
|
105 |
solrClient.setDefaultCollection(collection);
|
|
106 | 106 |
|
107 |
final SolrPingResponse rsp = solrServer.ping();
|
|
107 |
final SolrPingResponse rsp = solrClient.ping();
|
|
108 | 108 |
|
109 | 109 |
if (rsp.getStatus() != 0) throw new SolrServerException("bad init status: " + rsp.getStatus()); |
110 | 110 |
else { |
... | ... | |
112 | 112 |
} |
113 | 113 |
|
114 | 114 |
} catch (final Throwable e) { |
115 |
if (solrServer != null) {
|
|
116 |
solrServer.shutdown();
|
|
115 |
if (solrClient != null) {
|
|
116 |
solrClient.close();
|
|
117 | 117 |
} |
118 | 118 |
context.getCounter("index init", e.getMessage()).increment(1); |
119 | 119 |
log.info(String.format("failed to init solr client wait %dms", backoffTimeMs)); |
... | ... | |
135 | 135 |
} |
136 | 136 |
|
137 | 137 |
final Oaf oaf = Oaf.parseFrom(bMap.get(DedupUtils.BODY_B)); |
138 |
if (oaf.getDataInfo().getInvisible()) { |
|
139 |
context.getCounter(entityType, "invisible"); |
|
140 |
return; |
|
141 |
} |
|
138 | 142 |
|
139 | 143 |
try { |
140 | 144 |
doc = getDocument(oaf); |
... | ... | |
182 | 186 |
private void doAdd(final List<SolrInputDocument> buffer, final Context context) throws SolrServerException, IOException { |
183 | 187 |
if (!simulation) { |
184 | 188 |
final long start = System.currentTimeMillis(); |
185 |
final UpdateResponse rsp = solrServer.add(buffer);
|
|
189 |
final UpdateResponse rsp = solrClient.add(buffer);
|
|
186 | 190 |
final long stop = System.currentTimeMillis() - start; |
187 | 191 |
log.info("feed time for " + buffer.size() + " records : " + HumanTime.exactly(stop) + "\n"); |
188 | 192 |
|
... | ... | |
203 | 207 |
} |
204 | 208 |
log.info("\nwaiting " + shutdownWaitTime + "ms before shutdown"); |
205 | 209 |
Thread.sleep(shutdownWaitTime); |
206 |
solrServer.shutdown();
|
|
210 |
solrClient.close();
|
|
207 | 211 |
} catch (final SolrServerException e) { |
208 | 212 |
System.err.println("couldn't shutdown server " + e.getMessage()); |
209 | 213 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.io.Text; |
|
6 |
import org.apache.hadoop.mapreduce.Reducer; |
|
7 |
|
|
8 |
public class ExportSimplifiedRecordsReducer extends Reducer<Text, Text, Text, Text> { |
|
9 |
|
|
10 |
private Text keyOut; |
|
11 |
|
|
12 |
@Override |
|
13 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
14 |
keyOut = new Text(""); |
|
15 |
} |
|
16 |
|
|
17 |
@Override |
|
18 |
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
|
19 |
for(final Text v : values) { |
|
20 |
//keyOut.set(key.toString() + "@@@"); |
|
21 |
context.write(keyOut, v); |
|
22 |
} |
|
23 |
} |
|
24 |
|
|
25 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSimplifiedRecordsMapper.java | ||
---|---|---|
37 | 37 |
|
38 | 38 |
final String summary = recordSummarizer.evaluate(value.toString()); |
39 | 39 |
if (StringUtils.isNotBlank(summary)) { |
40 |
keyOut.set(StringUtils.substringAfter(key.toString(), "::")); |
|
40 | 41 |
valueOut.set(summary.replaceAll("\n","").replaceAll("\t","")); |
41 | 42 |
context.write(keyOut, valueOut); |
42 | 43 |
} |
Also available in: Unified diff
using solrj 7.x client