Revision 48920
Added by Claudio Atzori over 6 years ago
DedupIndexDAO.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.dedup; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 |
import java.util.*; |
|
5 |
import java.util.stream.Collectors; |
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
3 | 8 |
import com.google.common.base.Function; |
4 | 9 |
import com.google.common.base.Joiner; |
5 |
import com.google.common.base.Predicate; |
|
6 | 10 |
import com.google.common.base.Splitter; |
7 | 11 |
import com.google.common.collect.Iterables; |
8 | 12 |
import com.google.common.collect.Lists; |
... | ... | |
11 | 15 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
12 | 16 |
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder; |
13 | 17 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
14 |
import eu.dnetlib.data.proto.OafProtos.Oaf.Builder; |
|
15 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
16 | 18 |
import eu.dnetlib.data.transform.OafEntityMerger; |
17 | 19 |
import eu.dnetlib.data.transform.SolrProtoMapper; |
18 | 20 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException; |
... | ... | |
29 | 31 |
import org.apache.commons.lang.StringUtils; |
30 | 32 |
import org.apache.commons.logging.Log; |
31 | 33 |
import org.apache.commons.logging.LogFactory; |
32 |
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
34 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
|
33 | 35 |
import org.apache.solr.common.SolrInputDocument; |
34 | 36 |
import org.dom4j.DocumentException; |
35 | 37 |
import org.springframework.beans.factory.annotation.Autowired; |
36 | 38 |
import org.springframework.beans.factory.annotation.Value; |
37 | 39 |
|
38 |
import javax.annotation.Resource; |
|
39 |
import java.io.IOException; |
|
40 |
import java.util.*; |
|
41 |
|
|
42 | 40 |
public class DedupIndexDAO { |
43 | 41 |
|
44 | 42 |
private static final Log log = LogFactory.getLog(DedupIndexDAO.class); |
... | ... | |
48 | 46 |
private static final Map<String, Map<String, String>> paths = Maps.newHashMap(); |
49 | 47 |
|
50 | 48 |
static { |
51 |
paths.put("result", new HashMap<String, String>());
|
|
52 |
paths.put("organization", new HashMap<String, String>());
|
|
53 |
paths.put("person", new HashMap<String, String>());
|
|
49 |
paths.put("result", new HashMap<>()); |
|
50 |
paths.put("organization", new HashMap<>()); |
|
51 |
paths.put("person", new HashMap<>()); |
|
54 | 52 |
|
55 | 53 |
paths.get("result").put("provenance", "collectedfrom/value"); |
56 | 54 |
paths.get("organization").put("provenance", "collectedfrom/value"); |
... | ... | |
119 | 117 |
int commitStatus = 0; |
120 | 118 |
int addStatus = 0; |
121 | 119 |
|
122 |
final CloudSolrServer solrServer = getSolrServer(); |
|
123 |
|
|
124 | 120 |
log.info("starting index update"); |
125 | 121 |
|
126 |
try { |
|
122 |
try(final CloudSolrClient solrServer = getSolrServer()) {
|
|
127 | 123 |
final SolrProtoMapper mapper = initProtoMapper(); |
128 | 124 |
|
129 | 125 |
final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper); |
... | ... | |
151 | 147 |
|
152 | 148 |
// delete the old representatives, avoiding to remove the current one (if it didn't change) |
153 | 149 |
log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection)); |
154 |
for (final String rootId : Iterables.filter(group.getRootIds(), new Predicate<String>() { |
|
155 |
|
|
156 |
@Override |
|
157 |
public boolean apply(final String rootId) { |
|
158 |
return !rootId.equals(newRootId); |
|
159 |
} |
|
160 |
})) { |
|
150 |
for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) { |
|
161 | 151 |
solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet())); |
162 | 152 |
} |
163 | 153 |
|
164 | 154 |
commitStatus = solrServer.commit().getStatus(); |
165 | 155 |
|
166 | 156 |
log.debug("solr commit status: " + commitStatus); |
167 |
} finally { |
|
168 |
log.debug("closing solr zk client"); |
|
169 |
solrServer.getZkStateReader().close(); |
|
170 | 157 |
} |
171 | 158 |
|
172 | 159 |
return (addStatus == 0) && (commitStatus == 0); |
... | ... | |
181 | 168 |
} |
182 | 169 |
|
183 | 170 |
private Function<String, Oaf> getXml2OafFunction() { |
184 |
return new Function<String, Oaf>() { |
|
185 |
|
|
186 |
@Override |
|
187 |
public Oaf apply(final String s) { |
|
188 |
// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", ""); |
|
189 |
final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<"); |
|
190 |
try { |
|
191 |
final byte[] oafBytes = Base64.decodeBase64(base64); |
|
192 |
final Oaf oaf = OafDecoder.decode(oafBytes).getOaf(); |
|
193 |
return oaf; |
|
194 |
} catch (final Throwable e) { |
|
195 |
throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64); |
|
196 |
} |
|
171 |
return s -> { |
|
172 |
// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", ""); |
|
173 |
final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<"); |
|
174 |
try { |
|
175 |
final byte[] oafBytes = Base64.decodeBase64(base64); |
|
176 |
final Oaf oaf = OafDecoder.decode(oafBytes).getOaf(); |
|
177 |
return oaf; |
|
178 |
} catch (final Throwable e) { |
|
179 |
throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64); |
|
197 | 180 |
} |
198 | 181 |
}; |
199 | 182 |
} |
... | ... | |
207 | 190 |
+ "']//LAYOUT[@name='index']/FIELDS")); |
208 | 191 |
} |
209 | 192 |
|
210 |
private CloudSolrServer getSolrServer() {
|
|
193 |
private CloudSolrClient getSolrServer() {
|
|
211 | 194 |
final String zk = getIndexSolrUrlZk(); |
212 | 195 |
log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk)); |
213 |
final CloudSolrServer solrServer = new CloudSolrServer(zk);
|
|
196 |
final CloudSolrClient solrServer = new CloudSolrClient.Builder().withZkHost(zk).build();
|
|
214 | 197 |
solrServer.setDefaultCollection(dedupIndexCollection); |
215 | 198 |
|
216 | 199 |
return solrServer; |
... | ... | |
233 | 216 |
} |
234 | 217 |
|
235 | 218 |
private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) { |
236 |
return new Function<Oaf, Map<String, String>>() {
|
|
219 |
return oaf -> {
|
|
237 | 220 |
|
238 |
@Override |
|
239 |
public Map<String, String> apply(final Oaf oaf) { |
|
221 |
final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity(); |
|
222 |
final Map<String, String> res = Maps.newHashMap(); |
|
223 |
final String oafId = cleanId(oaf.getEntity().getId()); |
|
240 | 224 |
|
241 |
final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity(); |
|
242 |
final Map<String, String> res = Maps.newHashMap(); |
|
243 |
final String oafId = cleanId(oaf.getEntity().getId()); |
|
244 |
final List<String> idList = Lists.newArrayList(Iterables.transform(oaf.getEntity().getChildrenList(), new Function<OafEntity, String>() { |
|
225 |
final List<String> idList = oaf.getEntity().getChildrenList().stream() |
|
226 |
.map(e -> e.getId()) |
|
227 |
.map(s -> cleanId(s)) |
|
228 |
.collect(Collectors.toList()); |
|
229 |
if (idList.isEmpty()) { |
|
230 |
idList.add(oafId); |
|
231 |
} |
|
232 |
res.put("id", oafId); |
|
233 |
res.put("idList", Joiner.on(",").join(idList)); |
|
234 |
res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + ""); |
|
245 | 235 |
|
246 |
@Override |
|
247 |
public String apply(final OafEntity e) { |
|
248 |
return cleanId(e.getId()); |
|
249 |
} |
|
250 |
})); |
|
251 |
if (idList.isEmpty()) { |
|
252 |
idList.add(oafId); |
|
253 |
} |
|
254 |
res.put("id", oafId); |
|
255 |
res.put("idList", Joiner.on(",").join(idList)); |
|
256 |
res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + ""); |
|
236 |
for (final String fieldName : fields) { |
|
237 |
res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName)))); |
|
238 |
} |
|
257 | 239 |
|
258 |
for (final String fieldName : fields) { |
|
259 |
res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName)))); |
|
260 |
} |
|
261 |
|
|
262 |
return res; |
|
263 |
} |
|
240 |
return res; |
|
264 | 241 |
}; |
265 | 242 |
} |
266 | 243 |
|
... | ... | |
276 | 253 |
} |
277 | 254 |
|
278 | 255 |
private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) { |
279 |
return Iterables.transform(ids, new Function<String, String>() { |
|
256 |
return Iterables.transform(ids, id -> { |
|
257 |
try { |
|
258 |
final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\""; |
|
259 |
final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1); |
|
280 | 260 |
|
281 |
@Override |
|
282 |
public String apply(final String id) { |
|
283 |
try { |
|
284 |
final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\""; |
|
285 |
final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1); |
|
261 |
log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal())); |
|
286 | 262 |
|
287 |
log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal())); |
|
288 |
|
|
289 |
return Iterables.getOnlyElement(rsp.getRecords()); |
|
290 |
} catch (final Throwable e) { |
|
291 |
log.error(e); |
|
292 |
throw new RuntimeException("unable to query id: " + id, e); |
|
293 |
} |
|
263 |
return Iterables.getOnlyElement(rsp.getRecords()); |
|
264 |
} catch (final Throwable e) { |
|
265 |
log.error(e); |
|
266 |
throw new RuntimeException("unable to query id: " + id, e); |
|
294 | 267 |
} |
295 | 268 |
}); |
296 | 269 |
} |
297 | 270 |
|
298 | 271 |
private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) { |
299 |
return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() { |
|
300 |
|
|
301 |
@Override |
|
302 |
public Oaf apply(final Oaf.Builder builder) { |
|
303 |
// TODO add more changes to the Oaf object here as needed. |
|
304 |
builder.getDataInfoBuilder().setDeletedbyinference(true); |
|
305 |
return builder.build(); |
|
306 |
} |
|
272 |
return Lists.newArrayList(Iterables.transform(builders, builder -> { |
|
273 |
// TODO add more changes to the Oaf object here as needed. |
|
274 |
builder.getDataInfoBuilder().setDeletedbyinference(true); |
|
275 |
return builder.build(); |
|
307 | 276 |
})); |
308 | 277 |
} |
309 | 278 |
|
310 | 279 |
private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) { |
311 |
return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() { |
|
312 |
|
|
313 |
@Override |
|
314 |
public Oaf apply(final Oaf.Builder builder) { |
|
315 |
// TODO add more changes to the Oaf object here as needed. |
|
316 |
builder.getDataInfoBuilder().setDeletedbyinference(false); |
|
317 |
return builder.build(); |
|
318 |
} |
|
280 |
return Lists.newArrayList(Iterables.transform(builders, builder -> { |
|
281 |
// TODO add more changes to the Oaf object here as needed. |
|
282 |
builder.getDataInfoBuilder().setDeletedbyinference(false); |
|
283 |
return builder.build(); |
|
319 | 284 |
})); |
320 | 285 |
} |
321 | 286 |
|
322 | 287 |
private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) { |
323 |
return Iterables.transform(oaf, new Function<Oaf, Oaf.Builder>() { |
|
324 |
|
|
325 |
@Override |
|
326 |
public Builder apply(final Oaf oaf) { |
|
327 |
return Oaf.newBuilder(oaf); |
|
328 |
} |
|
329 |
}); |
|
288 |
return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1)); |
|
330 | 289 |
} |
331 | 290 |
|
332 | 291 |
private String newRootId(final SimilarityGroup group) { |
... | ... | |
339 | 298 |
} |
340 | 299 |
|
341 | 300 |
private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) { |
342 |
return new Function<Oaf, SolrInputDocument>() { |
|
343 |
|
|
344 |
@Override |
|
345 |
public SolrInputDocument apply(final Oaf oaf) { |
|
346 |
try { |
|
347 |
return mapper.map(oaf, group.getDate(), "", group.getActionSet()); |
|
348 |
} catch (final Throwable e) { |
|
349 |
throw new IllegalArgumentException("unable to map proto to index document", e); |
|
350 |
} |
|
301 |
return oaf -> { |
|
302 |
try { |
|
303 |
return mapper.map(oaf, group.getDate(), "", group.getActionSet()); |
|
304 |
} catch (final Throwable e) { |
|
305 |
throw new IllegalArgumentException("unable to map proto to index document", e); |
|
351 | 306 |
} |
352 | 307 |
}; |
353 | 308 |
} |
Also available in: Unified diff
upgraded solr version to 6.6.0, inherit from snapshot parent