Project

General

Profile

1
package eu.dnetlib.functionality.modular.ui.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Collections;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.Map.Entry;
9
import java.util.Set;
10
import java.util.UUID;
11

    
12
import javax.annotation.Resource;
13

    
14
import org.apache.commons.codec.binary.Base64;
15
import org.apache.commons.lang.StringUtils;
16
import org.apache.commons.lang.exception.ExceptionUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.solr.client.solrj.impl.CloudSolrServer;
20
import org.apache.solr.common.SolrInputDocument;
21
import org.dom4j.DocumentException;
22
import org.springframework.beans.factory.annotation.Autowired;
23
import org.springframework.beans.factory.annotation.Value;
24
import org.springframework.stereotype.Controller;
25
import org.springframework.web.bind.annotation.RequestBody;
26
import org.springframework.web.bind.annotation.RequestMapping;
27
import org.springframework.web.bind.annotation.RequestParam;
28
import org.springframework.web.bind.annotation.ResponseBody;
29

    
30
import com.google.common.base.Function;
31
import com.google.common.base.Joiner;
32
import com.google.common.base.Predicate;
33
import com.google.common.base.Splitter;
34
import com.google.common.collect.Iterables;
35
import com.google.common.collect.Lists;
36
import com.google.common.collect.Maps;
37
import com.google.common.collect.Sets;
38

    
39
import eu.dnetlib.data.mapreduce.util.OafDecoder;
40
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
41
import eu.dnetlib.data.proto.OafProtos.Oaf;
42
import eu.dnetlib.data.proto.OafProtos.Oaf.Builder;
43
import eu.dnetlib.data.proto.OafProtos.OafEntity;
44
import eu.dnetlib.data.transform.OafEntityMerger;
45
import eu.dnetlib.data.transform.SolrProtoMapper;
46
import eu.dnetlib.enabling.database.rmi.DatabaseService;
47
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
48
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
49
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
50
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
51
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
52
import eu.dnetlib.functionality.index.client.IndexClient;
53
import eu.dnetlib.functionality.index.client.IndexClientException;
54
import eu.dnetlib.functionality.index.client.ResolvingIndexClientFactory;
55
import eu.dnetlib.functionality.index.client.response.LookupResponse;
56
import eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory;
57
import eu.dnetlib.functionality.modular.ui.AbstractAjaxController;
58
import eu.dnetlib.miscutils.datetime.DateUtils;
59
import eu.dnetlib.pace.config.DedupConfig;
60

    
61
@Controller
62
public class DedupServiceInternalController extends AbstractAjaxController {
63

    
64
	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";
65

    
66
	private static final Log log = LogFactory.getLog(DedupServiceInternalController.class);
67

    
68
	@Resource
69
	private UniqueServiceLocator serviceLocator;
70

    
71
	@Autowired
72
	private ResultSetClientFactory resultSetClientFactory;
73

    
74
	@Value("${dnet.dedup.db.name}")
75
	private String dbName;
76

    
77
	/** The index client factory. */
78
	@Autowired
79
	private ResolvingIndexClientFactory indexClientFactory;
80

    
81
	@Value("${dnet.dedup.index.format}")
82
	private String indexFormat;
83

    
84
	@Value("${dnet.dedup.index.collection}")
85
	private String dedupIndexCollection;
86

    
87
	private IndexClient indexClient = null;
88

    
89
	public class OafResult {
90

    
91
		private long total;
92

    
93
		private List<Map<String, String>> results;
94

    
95
		public OafResult(final long total, final List<Map<String, String>> results) {
96
			super();
97
			this.setTotal(total);
98
			this.setResults(results);
99
		}
100

    
101
		public long getTotal() {
102
			return total;
103
		}
104

    
105
		public void setTotal(final long total) {
106
			this.total = total;
107
		}
108

    
109
		public List<Map<String, String>> getResults() {
110
			return results;
111
		}
112

    
113
		public void setResults(final List<Map<String, String>> results) {
114
			this.results = results;
115
		}
116
	}
117

    
118
	@ResponseBody
119
	@RequestMapping(value = "/ui/dedup/lookupConfigurations.do")
120
	public Map<String, List<String>> lookupConfigurations() throws ISLookUpException {
121
		final Map<String, List<String>> res = Maps.newHashMap();
122

    
123
		final ISLookUpService lookUpService = serviceLocator.getService(ISLookUpService.class);
124
		final String listEntityTypesXQuery =
125
				"distinct-values(for $x in //RESOURCE_PROFILE["
126
						+ ".//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and "
127
						+ ".//CONFIGURATION/@enabled='true'] return $x//ENTITY/@name/string())";
128

    
129
		for (final String entityType : lookUpService.quickSearchProfile(listEntityTypesXQuery)) {
130
			final String xquery =
131
					String.format(
132
							"for $x in //RESOURCE_PROFILE[" +
133
									".//RESOURCE_TYPE/@value = 'DedupOrchestrationDSResourceType' and .//ENTITY/@name='%s' ] " +
134
									"return $x//ACTION_SET/@id/string()", entityType);
135
			res.put(entityType, lookUpService.quickSearchProfile(xquery));
136
		}
137
		return res;
138
	}
139

    
140
	@ResponseBody
141
	@RequestMapping(value = "/ui/dedup/search.do")
142
	public OafResult search(@RequestParam(value = "entityType", required = true) final String type,
143
			@RequestParam(value = "query", required = true) final String userQuery,
144
			@RequestParam(value = "actionSet", required = true) final String actionSet,
145
			@RequestParam(value = "start", required = true) final int start,
146
			@RequestParam(value = "rows", required = true) final int rows,
147
			@RequestParam(value = "fields", required = true) final String fields) throws Exception {
148

    
149
		try {
150
			final String cqlQuery =
151
					String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
152
							userQuery);
153

    
154
			final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);
155

    
156
			final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
157
			final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));
158

    
159
			return new OafResult(rsp.getTotal(), resList);
160
		} catch (final Exception e) {
161
			log.error("search error", e);
162
			throw e;
163
		}
164
	}
165

    
166
	@ResponseBody
167
	@RequestMapping(value = "/ui/dedup/searchById.do")
168
	public OafResult searchById(@RequestParam(value = "actionSet", required = true) final String actionSet,
169
			@RequestParam(value = "entityType", required = true) final String type,
170
			@RequestParam(value = "objidentifier", required = true) final String objidentifier,
171
			@RequestParam(value = "fields", required = true) final List<String> fields) throws Exception {
172

    
173
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";
174

    
175
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);
176

    
177
		final Iterable<Oaf> oafList = toOaf(rsp);
178

    
179
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));
180

    
181
		return new OafResult(rsp.getTotal(), resList);
182
	}
183

    
184
	@ResponseBody
185
	@RequestMapping(value = "/ui/dedup/commit.do")
186
	public boolean addSimRels(@RequestBody(required = true) final SimilarityGroup group) throws Exception {
187
		try {
188
			if (StringUtils.isBlank(group.getActionSet())) throw new IllegalArgumentException("missing actionset");
189
			final DatabaseService dbService = serviceLocator.getService(DatabaseService.class);
190
			final String version = InputDocumentFactory.getParsedDateField(DateUtils.now_ISO8601());
191

    
192
			group.setId(UUID.randomUUID().toString());
193
			group.setDate(version);
194

    
195
			// relational DB update
196
			log.info("adding similarities: " + group.getGroup());
197
			updateGroupSql(dbService, group, version);
198

    
199
			log.info("adding dissimilarities: " + group.getDissimilar());
200
			dissimilaritiesSql(dbService, group);
201

    
202
			// index update
203
			log.info("starting index update");
204

    
205
			final CloudSolrServer solrServer = getSolrServer();
206
			final SolrProtoMapper mapper = initProtoMapper();
207

    
208
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(version, group.getActionSet(), mapper);
209
			final List<SolrInputDocument> buffer = Lists.newLinkedList();
210

    
211
			final List<Oaf> groupDocs = Lists.newArrayList(markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet())))));
212

    
213
			buffer.addAll(Lists.newArrayList(asIndexDocs(oaf2solr, groupDocs)));
214
			final DedupConfig dedupConf = getDedupConfig(group);
215
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(dedupConf, newRootId(group), groupDocs).build());
216
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
217
			buffer.add(newRoot);
218

    
219
			final Set<String> dissimIds = getUniqueDissimilarIds(group.getDissimilar());
220
			final List<Oaf> dissimDocs =
221
					Lists.newArrayList(markUnDeleted(asOafBuilder(parseBase64(queryIndex(dissimIds, group.getActionSet())))));
222
			buffer.addAll(Lists.newArrayList(asIndexDocs(oaf2solr, dissimDocs)));
223

    
224
			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
225

    
226
			final int addStatus = solrServer.add(buffer).getStatus();
227
			log.debug("solr add status: " + addStatus);
228

    
229
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
230
			for (final String rootId : Iterables.filter(group.getRootIds(), new Predicate<String>() {
231

    
232
				final String newRootId = (String) newRoot.getFieldValue("objidentifier");
233

    
234
				@Override
235
				public boolean apply(final String rootId) {
236

    
237
					return !rootId.equals(newRootId);
238
				}
239
			})) {
240
				solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
241
			}
242

    
243
			final int commitStatus = solrServer.commit().getStatus();
244
			log.debug("solr commit status: " + commitStatus);
245

    
246
			return (addStatus == 0) && (commitStatus == 0);
247
		} catch (final Exception e) {
248
			log.error(e);
249
			throw e;
250
		}
251
	}
252

    
253
	private Set<String> getUniqueDissimilarIds(final Map<String, Set<String>> map) {
254
		final Set<String> res = Sets.newHashSet();
255
		for (final Set<String> ids : map.values()) {
256
			res.addAll(ids);
257
		}
258

    
259
		return res;
260
	}
261

    
262
	private DedupConfig getDedupConfig(final SimilarityGroup group) throws IOException {
263
		final Map<String, String> config = Maps.newHashMap();
264
		config.put("entityType", group.getEntityType().getType());
265
		config.put("configurationId", group.getActionSet());
266
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
267
		return dedupConf;
268
	}
269

    
270
	// helpers
271

    
272
	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
273
		if (indexClient == null) {
274
			indexClient = indexClientFactory.getClient(indexFormat, "index", "dedup", "solr");
275
		}
276
		return indexClient;
277
	}
278

    
279
	private static Map<String, Map<String, String>> paths = Maps.newHashMap();
280

    
281
	static {
282
		paths.put("result", new HashMap<String, String>());
283
		paths.put("organization", new HashMap<String, String>());
284
		paths.put("person", new HashMap<String, String>());
285

    
286
		paths.get("result").put("title", "result/metadata/title/value");
287
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
288
		paths.get("result").put("description", "result/metadata/description/value");
289
		paths.get("result").put("author", "result/author/metadata/fullname/value");
290

    
291
		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
292
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
293
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
294
		paths.get("organization").put("country", "organization/metadata/country/classid");
295

    
296
		paths.get("person").put("fullname", "person/metadata/fullname/value");
297
	}
298

    
299
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
300
		return new Function<Oaf, Map<String, String>>() {
301

    
302
			@Override
303
			public Map<String, String> apply(final Oaf oaf) {
304

    
305
				final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
306
				final Map<String, String> res = Maps.newHashMap();
307
				final String oafId = cleanId(oaf.getEntity().getId());
308
				final List<String> idList = Lists.newArrayList(Iterables.transform(oaf.getEntity().getChildrenList(), new Function<OafEntity, String>() {
309

    
310
					@Override
311
					public String apply(final OafEntity e) {
312
						return cleanId(e.getId());
313
					}
314
				}));
315
				if (idList.isEmpty()) {
316
					idList.add(oafId);
317
				}
318
				res.put("id", oafId);
319
				res.put("idList", Joiner.on(",").join(idList));
320
				res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
321

    
322
				for (final String fieldName : fields) {
323
					res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
324
				}
325

    
326
				return res;
327
			}
328
		};
329
	}
330

    
331
	private String cleanId(final String id) {
332
		return id.replaceFirst(ID_PREFIX_REGEX, "");
333
	}
334

    
335
	private String newRootId(final SimilarityGroup group) {
336
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
337
		// else return Collections.min(group.getRootIds());
338
	}
339

    
340
	private Iterable<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
341
		return Iterables.transform(protos, mapper);
342
	}
343

    
344
	private Function<Oaf, SolrInputDocument> oaf2solr(final String version, final String actionSetId, final SolrProtoMapper mapper) {
345
		return new Function<Oaf, SolrInputDocument>() {
346

    
347
			@Override
348
			public SolrInputDocument apply(final Oaf oaf) {
349
				try {
350
					return mapper.map(oaf, version, "", actionSetId);
351
				} catch (final Throwable e) {
352
					throw new IllegalArgumentException("unable to map proto to index document", e);
353
				}
354
			}
355
		};
356
	}
357

    
358
	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
359
		return Iterables.transform(ids, idToIndexDocumentMapper(actionset));
360
	}
361

    
362
	private Function<String, String> idToIndexDocumentMapper(final String actionset) {
363
		return new Function<String, String>() {
364

    
365
			@Override
366
			public String apply(final String id) {
367
				try {
368
					final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
369
					final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
370

    
371
					log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
372

    
373
					return Iterables.getOnlyElement(rsp.getRecords());
374
				} catch (final Throwable e) {
375
					log.error(e);
376
					throw new RuntimeException("unable to query id: " + id, e);
377
				}
378
			}
379
		};
380
	}
381

    
382
	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
383
		return Iterables.transform(r, getXml2OafFunction());
384
	}
385

    
386
	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
387
		return new SolrProtoMapper(
388
				serviceLocator
389
				.getService(ISLookUpService.class)
390
				.getResourceProfileByQuery(
391
						"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + indexFormat
392
						+ "']//LAYOUT[@name='index']/FIELDS"));
393
	}
394

    
395
	private Iterable<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
396
		return Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {
397

    
398
			@Override
399
			public Oaf apply(final Oaf.Builder builder) {
400
				// TODO add more changes to the Oaf object here as needed.
401
				builder.getDataInfoBuilder().setDeletedbyinference(true);
402
				return builder.build();
403
			}
404
		});
405
	}
406

    
407
	private Iterable<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
408
		return Iterables.transform(builders, new Function<Oaf.Builder, Oaf>() {
409

    
410
			@Override
411
			public Oaf apply(final Oaf.Builder builder) {
412
				// TODO add more changes to the Oaf object here as needed.
413
				builder.getDataInfoBuilder().setDeletedbyinference(false);
414
				return builder.build();
415
			}
416
		});
417
	}
418

    
419
	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
420
		return Iterables.transform(oaf, new Function<Oaf, Oaf.Builder>() {
421

    
422
			@Override
423
			public Builder apply(final Oaf oaf) {
424
				return Oaf.newBuilder(oaf);
425
			}
426
		});
427
	}
428

    
429
	private Function<String, Oaf> getXml2OafFunction() {
430
		return new Function<String, Oaf>() {
431

    
432
			@Override
433
			public Oaf apply(final String s) {
434
				// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
435
				final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
436
				try {
437
					final byte[] oaf = Base64.decodeBase64(base64);
438
					return OafDecoder.decode(oaf).getOaf();
439
				} catch (final Throwable e) {
440
					throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
441
				}
442
			}
443
		};
444
	}
445

    
446
	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
447
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
448
	}
449

    
450
	private void updateGroupSql(final DatabaseService dbService, final SimilarityGroup group, final String version) throws Exception {
451
		for (final String id : group.getGroup()) {
452
			// sql.append(String.format("DELETE FROM entities WHERE objidentifier = '%s'; ", id));
453
			safeUpdateSql(dbService, String.format("DELETE FROM similarity_groups WHERE objidentifier = '%s'; ", id));
454
		}
455
		final String type = group.getEntityType().getType();
456
		safeUpdateSql(dbService,
457
				String.format("INSERT INTO groups(id, entitytype, date, actionsetid) VALUES('%s', '%s', '%s', '%s'); ", group.getId(), type, version,
458
						group.getActionSet()));
459
		for (final String id : group.getGroup()) {
460
			if (!dbService.contains(dbName, "entities", "id", id)) {
461
				safeUpdateSql(dbService, String.format("INSERT INTO entities(id, entitytype) VALUES('%s', '%s'); ", id, type));
462
			}
463

    
464
			// throw new Exception("id already defined in a similarity group.");
465
			safeUpdateSql(dbService, String.format("INSERT INTO similarity_groups(groupid, objidentifier) VALUES('%s', '%s'); ", group.getId(), id));
466
		}
467
	}
468

    
469
	private void dissimilaritiesSql(final DatabaseService dbService, final SimilarityGroup group) {
470

    
471
		final String type = group.getEntityType().getType();
472

    
473
		for (final Entry<String, Set<String>> e : group.getDissimilar().entrySet()) {
474
			if (!dbService.contains(dbName, "entities", "id", e.getKey())) {
475
				safeUpdateSql(dbService, String.format("INSERT INTO entities(id, entitytype) VALUES('%s', '%s'); ", e.getKey(), type));
476
			}
477
			for (final String id : e.getValue()) {
478
				if (!dbService.contains(dbName, "entities", "id", id)) {
479
					safeUpdateSql(dbService, String.format("INSERT INTO entities(id, entitytype) VALUES('%s', '%s'); ", id, type));
480
				}
481
			}
482
		}
483

    
484
		for (final Entry<String, Set<String>> e : group.getDissimilar().entrySet()) {
485
			for (final String id : e.getValue()) {
486
				safeUpdateSql(dbService,
487
						String.format("INSERT INTO dissimilarities(id1, id2, actionsetid) VALUES('%s', '%s', '%s'); ", e.getKey(), id, group.getActionSet()));
488
			}
489
		}
490
	}
491

    
492
	private boolean safeUpdateSql(final DatabaseService dbService, final String sql) {
493
		try {
494
			log.info(sql);
495
			return dbService.updateSQL(dbName, sql);
496
		} catch (final Throwable e) {
497
			log.error(e.getMessage());
498
			log.debug(ExceptionUtils.getFullStackTrace(e));
499
			return false;
500
		}
501
	}
502

    
503
	private CloudSolrServer getSolrServer() {
504
		final String zk = getIndexSolrUrlZk();
505
		log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
506
		final CloudSolrServer solrServer = new CloudSolrServer(zk);
507
		solrServer.setDefaultCollection(dedupIndexCollection);
508

    
509
		return solrServer;
510
	}
511

    
512
	private String getIndexSolrUrlZk() {
513
		try {
514
			return getResourceProfileByQuery("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
515
		} catch (final ISLookUpException e) {
516
			throw new IllegalStateException("unable to read solr ZK url from service profile", e);
517
		}
518
	}
519

    
520
	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
521
		log.debug("quering for service property: " + xquery);
522
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
523
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
524
		return res;
525
	}
526

    
527
}
(2-2/4)