1 |
1 |
package eu.dnetlib.data.dedup;
|
2 |
2 |
|
|
3 |
import java.io.IOException;
|
|
4 |
import java.util.*;
|
|
5 |
import java.util.stream.Collectors;
|
|
6 |
import javax.annotation.Resource;
|
|
7 |
|
3 |
8 |
import com.google.common.base.Function;
|
4 |
9 |
import com.google.common.base.Joiner;
|
5 |
|
import com.google.common.base.Predicate;
|
6 |
10 |
import com.google.common.base.Splitter;
|
7 |
11 |
import com.google.common.collect.Iterables;
|
8 |
12 |
import com.google.common.collect.Lists;
|
... | ... | |
11 |
15 |
import eu.dnetlib.data.mapreduce.util.OafDecoder;
|
12 |
16 |
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
|
13 |
17 |
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
14 |
|
import eu.dnetlib.data.proto.OafProtos.Oaf.Builder;
|
15 |
|
import eu.dnetlib.data.proto.OafProtos.OafEntity;
|
16 |
18 |
import eu.dnetlib.data.transform.OafEntityMerger;
|
17 |
19 |
import eu.dnetlib.data.transform.SolrProtoMapper;
|
18 |
20 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
|
... | ... | |
29 |
31 |
import org.apache.commons.lang.StringUtils;
|
30 |
32 |
import org.apache.commons.logging.Log;
|
31 |
33 |
import org.apache.commons.logging.LogFactory;
|
32 |
|
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
34 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
33 |
35 |
import org.apache.solr.common.SolrInputDocument;
|
34 |
36 |
import org.dom4j.DocumentException;
|
35 |
37 |
import org.springframework.beans.factory.annotation.Autowired;
|
36 |
38 |
import org.springframework.beans.factory.annotation.Value;
|
37 |
39 |
|
38 |
|
import javax.annotation.Resource;
|
39 |
|
import java.io.IOException;
|
40 |
|
import java.util.*;
|
41 |
|
|
42 |
40 |
public class DedupIndexDAO {
|
43 |
41 |
|
44 |
42 |
private static final Log log = LogFactory.getLog(DedupIndexDAO.class);
|
... | ... | |
48 |
46 |
private static final Map<String, Map<String, String>> paths = Maps.newHashMap();
|
49 |
47 |
|
50 |
48 |
static {
|
51 |
|
paths.put("result", new HashMap<String, String>());
|
52 |
|
paths.put("organization", new HashMap<String, String>());
|
53 |
|
paths.put("person", new HashMap<String, String>());
|
|
49 |
paths.put("result", new HashMap<>());
|
|
50 |
paths.put("organization", new HashMap<>());
|
|
51 |
paths.put("person", new HashMap<>());
|
54 |
52 |
|
55 |
53 |
paths.get("result").put("provenance", "collectedfrom/value");
|
56 |
54 |
paths.get("organization").put("provenance", "collectedfrom/value");
|
... | ... | |
119 |
117 |
int commitStatus = 0;
|
120 |
118 |
int addStatus = 0;
|
121 |
119 |
|
122 |
|
final CloudSolrServer solrServer = getSolrServer();
|
123 |
|
|
124 |
120 |
log.info("starting index update");
|
125 |
121 |
|
126 |
|
try {
|
|
122 |
try(final CloudSolrClient solrServer = getSolrServer()) {
|
127 |
123 |
final SolrProtoMapper mapper = initProtoMapper();
|
128 |
124 |
|
129 |
125 |
final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
|
... | ... | |
151 |
147 |
|
152 |
148 |
// delete the old representatives, avoiding to remove the current one (if it didn't change)
|
153 |
149 |
log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
|
154 |
|
for (final String rootId : Iterables.filter(group.getRootIds(), new Predicate<String>() {
|
155 |
|
|
156 |
|
@Override
|
157 |
|
public boolean apply(final String rootId) {
|
158 |
|
return !rootId.equals(newRootId);
|
159 |
|
}
|
160 |
|
})) {
|
|
150 |
for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
|
161 |
151 |
solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
|
162 |
152 |
}
|
163 |
153 |
|
164 |
154 |
commitStatus = solrServer.commit().getStatus();
|
165 |
155 |
|
166 |
156 |
log.debug("solr commit status: " + commitStatus);
|
167 |
|
} finally {
|
168 |
|
log.debug("closing solr zk client");
|
169 |
|
solrServer.getZkStateReader().close();
|
170 |
157 |
}
|
171 |
158 |
|
172 |
159 |
return (addStatus == 0) && (commitStatus == 0);
|
... | ... | |
181 |
168 |
}
|
182 |
169 |
|
183 |
170 |
private Function<String, Oaf> getXml2OafFunction() {
|
184 |
|
return new Function<String, Oaf>() {
|
185 |
|
|
186 |
|
@Override
|
187 |
|
public Oaf apply(final String s) {
|
188 |
|
// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
|
189 |
|
final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
|
190 |
|
try {
|
191 |
|
final byte[] oafBytes = Base64.decodeBase64(base64);
|
192 |
|
final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
|
193 |
|
return oaf;
|
194 |
|
} catch (final Throwable e) {
|
195 |
|
throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
|
196 |
|
}
|
|
171 |
return s -> {
|
|
172 |
// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
|
|
173 |
final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
|
|
174 |
try {
|
|
175 |
final byte[] oafBytes = Base64.decodeBase64(base64);
|
|
176 |
final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
|
|
177 |
return oaf;
|
|
178 |
} catch (final Throwable e) {
|
|
179 |
throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
|
197 |
180 |
}
|
198 |
181 |
};
|
199 |
182 |
}
|
... | ... | |
207 |
190 |
+ "']//LAYOUT[@name='index']/FIELDS"));
|
208 |
191 |
}
|
209 |
192 |
|
210 |
|
private CloudSolrServer getSolrServer() {
|
|
193 |
private CloudSolrClient getSolrServer() {
|
211 |
194 |
final String zk = getIndexSolrUrlZk();
|
212 |
195 |
log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
|
213 |
|
final CloudSolrServer solrServer = new CloudSolrServer(zk);
|
|
196 |
final CloudSolrClient solrServer = new CloudSolrClient.Builder().withZkHost(zk).build();
|
214 |
197 |
solrServer.setDefaultCollection(dedupIndexCollection);
|
215 |
198 |
|
216 |
199 |
return solrServer;
|
... | ... | |
233 |
216 |
}
|
234 |
217 |
|
235 |
218 |
private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
|
236 |
|
return new Function<Oaf, Map<String, String>>() {
|
|
219 |
return oaf -> {
|
237 |
220 |
|
238 |
|
@Override
|
239 |
|
public Map<String, String> apply(final Oaf oaf) {
|
|
221 |
final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
|
|
222 |
final Map<String, String> res = Maps.newHashMap();
|
|
223 |
final String oafId = cleanId(oaf.getEntity().getId());
|
240 |
224 |
|
241 |
|
final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
|
242 |
|
final Map<String, String> res = Maps.newHashMap();
|
243 |
|
final String oafId = cleanId(oaf.getEntity().getId());
|
244 |
|
final List<String> idList = Lists.newArrayList(Iterables.transform(oaf.getEntity().getChildrenList(), new Function<OafEntity, String>() {
|
|
225 |
final List<String> idList = oaf.getEntity().getChildrenList().stream()
|
|
226 |
.map(e -> e.getId())
|
|
227 |
.map(s -> cleanId(s))
|
|
228 |
.collect(Collectors.toList());
|
|
229 |
if (idList.isEmpty()) {
|
|
230 |
idList.add(oafId);
|
|
231 |
}
|
|
232 |
res.put("id", oafId);
|
|
233 |
res.put("idList", Joiner.on(",").join(idList));
|
|
234 |
res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
|
245 |
235 |
|
246 |
|
@Override
|
247 |
|
public String apply(final OafEntity e) {
|
248 |
|
return cleanId(e.getId());
|
249 |
|
}
|
250 |
|
}));
|
251 |
|
if (idList.isEmpty()) {
|
252 |
|
idList.add(oafId);
|
253 |
|
}
|
254 |
|
res.put("id", oafId);
|
255 |
|
res.put("idList", Joiner.on(",").join(idList));
|
256 |
|
res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
|
|
236 |
for (final String fieldName : fields) {
|
|
237 |
res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
|
|
238 |
}
|
257 |
239 |
|
258 |
|
for (final String fieldName : fields) {
|
259 |
|
res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
|
260 |
|
}
|
261 |
|
|
262 |
|
return res;
|
263 |
|
}
|
|
240 |
return res;
|
264 |
241 |
};
|
265 |
242 |
}
|
266 |
243 |
|
... | ... | |
276 |
253 |
}
|
277 |
254 |
|
278 |
255 |
private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
|
279 |
|
return Iterables.transform(ids, new Function<String, String>() {
|
|
256 |
return Iterables.transform(ids, id -> {
|
|
257 |
try {
|
|
258 |
final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
|
|
259 |
final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
|
280 |
260 |
|
281 |
|
@Override
|
282 |
|
public String apply(final String id) {
|
283 |
|
try {
|
284 |
|
final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
|
285 |
|
final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
|
|
261 |
log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
|
286 |
262 |
|
287 |
|
log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
|
288 |
|
|
289 |
|
return Iterables.getOnlyElement(rsp.getRecords());
|
290 |
|
} catch (final Throwable e) {
|
291 |
|
log.error(e);
|
292 |
|
throw new RuntimeException("unable to query id: " + id, e);
|
293 |
|
}
|
|
263 |
return Iterables.getOnlyElement(rsp.getRecords());
|
|
264 |
} catch (final Throwable e) {
|
|
265 |
log.error(e);
|
|
266 |
throw new RuntimeException("unable to query id: " + id, e);
|
294 |
267 |
}
|
295 |
268 |
});
|
296 |
269 |
}
|
297 |
270 |
|
298 |
271 |
private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
|
299 |
|
return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {
|
300 |
|
|
301 |
|
@Override
|
302 |
|
public Oaf apply(final Oaf.Builder builder) {
|
303 |
|
// TODO add more changes to the Oaf object here as needed.
|
304 |
|
builder.getDataInfoBuilder().setDeletedbyinference(true);
|
305 |
|
return builder.build();
|
306 |
|
}
|
|
272 |
return Lists.newArrayList(Iterables.transform(builders, builder -> {
|
|
273 |
// TODO add more changes to the Oaf object here as needed.
|
|
274 |
builder.getDataInfoBuilder().setDeletedbyinference(true);
|
|
275 |
return builder.build();
|
307 |
276 |
}));
|
308 |
277 |
}
|
309 |
278 |
|
310 |
279 |
private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
|
311 |
|
return Lists.newArrayList(Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {
|
312 |
|
|
313 |
|
@Override
|
314 |
|
public Oaf apply(final Oaf.Builder builder) {
|
315 |
|
// TODO add more changes to the Oaf object here as needed.
|
316 |
|
builder.getDataInfoBuilder().setDeletedbyinference(false);
|
317 |
|
return builder.build();
|
318 |
|
}
|
|
280 |
return Lists.newArrayList(Iterables.transform(builders, builder -> {
|
|
281 |
// TODO add more changes to the Oaf object here as needed.
|
|
282 |
builder.getDataInfoBuilder().setDeletedbyinference(false);
|
|
283 |
return builder.build();
|
319 |
284 |
}));
|
320 |
285 |
}
|
321 |
286 |
|
322 |
287 |
private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
|
323 |
|
return Iterables.transform(oaf, new Function<Oaf, Oaf.Builder>() {
|
324 |
|
|
325 |
|
@Override
|
326 |
|
public Builder apply(final Oaf oaf) {
|
327 |
|
return Oaf.newBuilder(oaf);
|
328 |
|
}
|
329 |
|
});
|
|
288 |
return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
|
330 |
289 |
}
|
331 |
290 |
|
332 |
291 |
private String newRootId(final SimilarityGroup group) {
|
... | ... | |
339 |
298 |
}
|
340 |
299 |
|
341 |
300 |
private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
|
342 |
|
return new Function<Oaf, SolrInputDocument>() {
|
343 |
|
|
344 |
|
@Override
|
345 |
|
public SolrInputDocument apply(final Oaf oaf) {
|
346 |
|
try {
|
347 |
|
return mapper.map(oaf, group.getDate(), "", group.getActionSet());
|
348 |
|
} catch (final Throwable e) {
|
349 |
|
throw new IllegalArgumentException("unable to map proto to index document", e);
|
350 |
|
}
|
|
301 |
return oaf -> {
|
|
302 |
try {
|
|
303 |
return mapper.map(oaf, group.getDate(), "", group.getActionSet());
|
|
304 |
} catch (final Throwable e) {
|
|
305 |
throw new IllegalArgumentException("unable to map proto to index document", e);
|
351 |
306 |
}
|
352 |
307 |
};
|
353 |
308 |
}
|
upgraded solr version to 6.6.0, inherit from snapshot parent