Project

General

Profile

« Previous | Next » 

Revision 48920

upgraded solr version to 6.6.0, inherit from snapshot parent

View differences:

modules/dnet-deduplication/trunk/src/main/java/eu/dnetlib/data/dedup/DedupIndexDAO.java
1 1
package eu.dnetlib.data.dedup;
2 2

  
3
import java.io.IOException;
4
import java.util.*;
5
import java.util.stream.Collectors;
6
import javax.annotation.Resource;
7

  
3 8
import com.google.common.base.Function;
4 9
import com.google.common.base.Joiner;
5
import com.google.common.base.Predicate;
6 10
import com.google.common.base.Splitter;
7 11
import com.google.common.collect.Iterables;
8 12
import com.google.common.collect.Lists;
......
11 15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12 16
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
13 17
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.Oaf.Builder;
15
import eu.dnetlib.data.proto.OafProtos.OafEntity;
16 18
import eu.dnetlib.data.transform.OafEntityMerger;
17 19
import eu.dnetlib.data.transform.SolrProtoMapper;
18 20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
......
29 31
import org.apache.commons.lang.StringUtils;
30 32
import org.apache.commons.logging.Log;
31 33
import org.apache.commons.logging.LogFactory;
32
import org.apache.solr.client.solrj.impl.CloudSolrServer;
34
import org.apache.solr.client.solrj.impl.CloudSolrClient;
33 35
import org.apache.solr.common.SolrInputDocument;
34 36
import org.dom4j.DocumentException;
35 37
import org.springframework.beans.factory.annotation.Autowired;
36 38
import org.springframework.beans.factory.annotation.Value;
37 39

  
38
import javax.annotation.Resource;
39
import java.io.IOException;
40
import java.util.*;
41

  
42 40
public class DedupIndexDAO {
43 41

  
44 42
	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);
......
48 46
	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();
49 47

  
50 48
	static {
51
		paths.put("result", new HashMap<String, String>());
52
		paths.put("organization", new HashMap<String, String>());
53
		paths.put("person", new HashMap<String, String>());
49
		paths.put("result", new HashMap<>());
50
		paths.put("organization", new HashMap<>());
51
		paths.put("person", new HashMap<>());
54 52

  
55 53
		paths.get("result").put("provenance", "collectedfrom/value");
56 54
		paths.get("organization").put("provenance", "collectedfrom/value");
......
119 117
		int commitStatus = 0;
120 118
		int addStatus = 0;
121 119

  
122
		final CloudSolrServer solrServer = getSolrServer();
123

  
124 120
		log.info("starting index update");
125 121

  
126
		try {
122
		try(final CloudSolrClient solrServer = getSolrServer()) {
127 123
			final SolrProtoMapper mapper = initProtoMapper();
128 124

  
129 125
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
......
151 147

  
152 148
			// delete the old representatives, avoiding to remove the current one (if it didn't change)
153 149
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
154
			for (final String rootId : Iterables.filter(group.getRootIds(), new Predicate<String>() {
155

  
156
				@Override
157
				public boolean apply(final String rootId) {
158
					return !rootId.equals(newRootId);
159
				}
160
			})) {
150
			for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
161 151
				solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
162 152
			}
163 153

  
164 154
			commitStatus = solrServer.commit().getStatus();
165 155

  
166 156
			log.debug("solr commit status: " + commitStatus);
167
		} finally {
168
			log.debug("closing solr zk client");
169
			solrServer.getZkStateReader().close();
170 157
		}
171 158

  
172 159
		return (addStatus == 0) && (commitStatus == 0);
......
181 168
	}
182 169

  
183 170
	private Function<String, Oaf> getXml2OafFunction() {
184
		return new Function<String, Oaf>() {
185

  
186
			@Override
187
			public Oaf apply(final String s) {
188
				// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
189
				final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
190
				try {
191
					final byte[] oafBytes = Base64.decodeBase64(base64);
192
					final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
193
					return oaf;
194
				} catch (final Throwable e) {
195
					throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
196
				}
171
		return s -> {
172
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
173
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
174
			try {
175
				final byte[] oafBytes = Base64.decodeBase64(base64);
176
				final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
177
				return oaf;
178
			} catch (final Throwable e) {
179
				throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
197 180
			}
198 181
		};
199 182
	}
......
207 190
										+ "']//LAYOUT[@name='index']/FIELDS"));
208 191
	}
209 192

  
210
	private CloudSolrServer getSolrServer() {
193
	private CloudSolrClient getSolrServer() {
211 194
		final String zk = getIndexSolrUrlZk();
212 195
		log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
213
		final CloudSolrServer solrServer = new CloudSolrServer(zk);
196
		final CloudSolrClient solrServer = new CloudSolrClient.Builder().withZkHost(zk).build();
214 197
		solrServer.setDefaultCollection(dedupIndexCollection);
215 198

  
216 199
		return solrServer;
......
233 216
	}
234 217

  
235 218
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
236
		return new Function<Oaf, Map<String, String>>() {
219
		return oaf -> {
237 220

  
238
			@Override
239
			public Map<String, String> apply(final Oaf oaf) {
221
			final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
222
			final Map<String, String> res = Maps.newHashMap();
223
			final String oafId = cleanId(oaf.getEntity().getId());
240 224

  
241
				final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
242
				final Map<String, String> res = Maps.newHashMap();
243
				final String oafId = cleanId(oaf.getEntity().getId());
244
				final List<String> idList = Lists.newArrayList(Iterables.transform(oaf.getEntity().getChildrenList(), new Function<OafEntity, String>() {
225
			final List<String> idList = oaf.getEntity().getChildrenList().stream()
226
					.map(e -> e.getId())
227
					.map(s -> cleanId(s))
228
					.collect(Collectors.toList());
229
			if (idList.isEmpty()) {
230
				idList.add(oafId);
231
			}
232
			res.put("id", oafId);
233
			res.put("idList", Joiner.on(",").join(idList));
234
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
245 235

  
246
					@Override
247
					public String apply(final OafEntity e) {
248
						return cleanId(e.getId());
249
					}
250
				}));
251
				if (idList.isEmpty()) {
252
					idList.add(oafId);
253
				}
254
				res.put("id", oafId);
255
				res.put("idList", Joiner.on(",").join(idList));
256
				res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
236
			for (final String fieldName : fields) {
237
				res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
238
			}
257 239

  
258
				for (final String fieldName : fields) {
259
					res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
260
				}
261

  
262
				return res;
263
			}
240
			return res;
264 241
		};
265 242
	}
266 243

  
......
276 253
	}
277 254

  
278 255
	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
279
		return Iterables.transform(ids, new Function<String, String>() {
256
		return Iterables.transform(ids, id -> {
257
			try {
258
				final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
259
				final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
280 260

  
281
			@Override
282
			public String apply(final String id) {
283
				try {
284
					final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
285
					final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
261
				log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
286 262

  
287
					log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
288

  
289
					return Iterables.getOnlyElement(rsp.getRecords());
290
				} catch (final Throwable e) {
291
					log.error(e);
292
					throw new RuntimeException("unable to query id: " + id, e);
293
				}
263
				return Iterables.getOnlyElement(rsp.getRecords());
264
			} catch (final Throwable e) {
265
				log.error(e);
266
				throw new RuntimeException("unable to query id: " + id, e);
294 267
			}
295 268
		});
296 269
	}
297 270

  
298 271
	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
299
		return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {
300

  
301
			@Override
302
			public Oaf apply(final Oaf.Builder builder) {
303
				// TODO add more changes to the Oaf object here as needed.
304
				builder.getDataInfoBuilder().setDeletedbyinference(true);
305
				return builder.build();
306
			}
272
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
273
			// TODO add more changes to the Oaf object here as needed.
274
			builder.getDataInfoBuilder().setDeletedbyinference(true);
275
			return builder.build();
307 276
		}));
308 277
	}
309 278

  
310 279
	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
311
		return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {
312

  
313
			@Override
314
			public Oaf apply(final Oaf.Builder builder) {
315
				// TODO add more changes to the Oaf object here as needed.
316
				builder.getDataInfoBuilder().setDeletedbyinference(false);
317
				return builder.build();
318
			}
280
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
281
			// TODO add more changes to the Oaf object here as needed.
282
			builder.getDataInfoBuilder().setDeletedbyinference(false);
283
			return builder.build();
319 284
		}));
320 285
	}
321 286

  
322 287
	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
323
		return Iterables.transform(oaf, new Function<Oaf, Oaf.Builder>() {
324

  
325
			@Override
326
			public Builder apply(final Oaf oaf) {
327
				return Oaf.newBuilder(oaf);
328
			}
329
		});
288
		return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
330 289
	}
331 290

  
332 291
	private String newRootId(final SimilarityGroup group) {
......
339 298
	}
340 299

  
341 300
	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
342
		return new Function<Oaf, SolrInputDocument>() {
343

  
344
			@Override
345
			public SolrInputDocument apply(final Oaf oaf) {
346
				try {
347
					return mapper.map(oaf, group.getDate(), "", group.getActionSet());
348
				} catch (final Throwable e) {
349
					throw new IllegalArgumentException("unable to map proto to index document", e);
350
				}
301
		return oaf -> {
302
			try {
303
				return mapper.map(oaf, group.getDate(), "", group.getActionSet());
304
			} catch (final Throwable e) {
305
				throw new IllegalArgumentException("unable to map proto to index document", e);
351 306
			}
352 307
		};
353 308
	}
modules/dnet-deduplication/trunk/pom.xml
3 3
	<parent>
4 4
		<groupId>eu.dnetlib</groupId>
5 5
		<artifactId>dnet45-parent</artifactId>
6
		<version>1.0.0</version>
6
		<version>1.0.0-SNAPSHOT</version>
7 7
		<relativePath />
8 8
	</parent>
9 9
	<modelVersion>4.0.0</modelVersion>

Also available in: Unified diff