Project

General

Profile

1
package eu.dnetlib.data.dedup;
2

    
3
import java.io.IOException;
4
import java.util.*;
5
import java.util.stream.Collectors;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.base.Joiner;
10
import com.google.common.base.Splitter;
11
import com.google.common.collect.Iterables;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.data.transform.OafEntityMerger;
19
import eu.dnetlib.data.transform.SolrProtoMapper;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.functionality.index.client.IndexClient;
25
import eu.dnetlib.functionality.index.client.IndexClientException;
26
import eu.dnetlib.functionality.index.client.ResolvingIndexClientFactory;
27
import eu.dnetlib.functionality.index.client.response.LookupResponse;
28
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
29
import eu.dnetlib.pace.config.DedupConfig;
30
import org.apache.commons.codec.binary.Base64;
31
import org.apache.commons.lang.StringUtils;
32
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.LogFactory;
34
import org.apache.solr.client.solrj.impl.CloudSolrServer;
35
import org.apache.solr.common.SolrInputDocument;
36
import org.dom4j.DocumentException;
37
import org.springframework.beans.factory.annotation.Autowired;
38
import org.springframework.beans.factory.annotation.Value;
39

    
40
public class DedupIndexDAO {
41

    
42
	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);
43

    
44
	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";
45

    
46
	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();
47

    
48
	static {
49
		paths.put("result", new HashMap<>());
50
		paths.put("organization", new HashMap<>());
51
		paths.put("person", new HashMap<>());
52

    
53
		paths.get("result").put("provenance", "collectedfrom/value");
54
		paths.get("organization").put("provenance", "collectedfrom/value");
55
		paths.get("person").put("provenance", "collectedfrom/value");
56

    
57
		paths.get("result").put("title", "result/metadata/title/value");
58
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
59
		paths.get("result").put("description", "result/metadata/description/value");
60
		paths.get("result").put("author", "result/metadata/author/fullname");
61

    
62
		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
63
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
64
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
65
		paths.get("organization").put("country", "organization/metadata/country/classid");
66

    
67
		paths.get("person").put("fullname", "person/metadata/fullname/value");
68
	}
69

    
70
	@Resource
71
	private UniqueServiceLocator serviceLocator;
72

    
73
	/**
74
	 * The index client factory.
75
	 */
76
	@Autowired
77
	private ResolvingIndexClientFactory indexClientFactory;
78

    
79
	private IndexClient indexClient = null;
80

    
81
	@Value("${dnet.dedup.index.format}")
82
	private String indexFormat;
83

    
84
	@Value("${dnet.dedup.index.collection}")
85
	private String dedupIndexCollection;
86

    
87
	public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields)
88
			throws Exception {
89

    
90
		final String cqlQuery =
91
				String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
92
						userQuery);
93

    
94
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);
95

    
96
		final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
97
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));
98

    
99
		return new OafResult(rsp.getTotal(), resList);
100

    
101
	}
102

    
103
	public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception {
104
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";
105

    
106
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);
107

    
108
		final Iterable<Oaf> oafList = toOaf(rsp);
109

    
110
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));
111

    
112
		return new OafResult(rsp.getTotal(), resList);
113
	}
114

    
115
	public boolean commit(final SimilarityGroup group) throws Exception {
116

    
117
		int commitStatus = 0;
118
		int addStatus = 0;
119

    
120
		log.info("starting index update");
121

    
122
		final CloudSolrServer solrServer = getSolrServer();
123
		try {
124
			final SolrProtoMapper mapper = initProtoMapper();
125

    
126
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
127
			final List<SolrInputDocument> buffer = Lists.newLinkedList();
128

    
129
			// mark as deleted all the documents in the group
130
			final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
131
			buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
132

    
133
			// elect a new representative
134
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
135
			final String newRootId = (String) newRoot.getFieldValue("objidentifier");
136
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
137
			buffer.add(newRoot);
138

    
139
			// mark as non deleted the documents taken away from the group
140
			final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
141
			buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
142

    
143
			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
144

    
145
			// add the changes to the server
146
			addStatus = solrServer.add(buffer).getStatus();
147
			log.debug("solr add status: " + addStatus);
148

    
149
			// delete the old representatives, avoiding to remove the current one (if it didn't change)
150
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
151
			for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
152
				solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
153
			}
154

    
155
			commitStatus = solrServer.commit().getStatus();
156

    
157
			log.debug("solr commit status: " + commitStatus);
158
		} finally {
159
			solrServer.shutdown();
160
		}
161

    
162
		return (addStatus == 0) && (commitStatus == 0);
163
	}
164

    
165
	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
166
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
167
	}
168

    
169
	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
170
		return Iterables.transform(r, getXml2OafFunction());
171
	}
172

    
173
	private Function<String, Oaf> getXml2OafFunction() {
174
		return s -> {
175
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
176
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
177
			try {
178
				final byte[] oafBytes = Base64.decodeBase64(base64);
179
				final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
180
				return oaf;
181
			} catch (final Throwable e) {
182
				throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
183
			}
184
		};
185
	}
186

    
187
	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
188
		return new SolrProtoMapper(
189
				serviceLocator
190
						.getService(ISLookUpService.class)
191
						.getResourceProfileByQuery(
192
								"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + indexFormat
193
										+ "']//LAYOUT[@name='index']/FIELDS"));
194
	}
195

    
196
	private CloudSolrServer getSolrServer() {
197
		final String zk = getIndexSolrUrlZk();
198
		log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
199
		final CloudSolrServer solrServer = new CloudSolrServer(zk);
200
		solrServer.setDefaultCollection(dedupIndexCollection);
201

    
202
		return solrServer;
203
	}
204

    
205
	private String getIndexSolrUrlZk() {
206
		try {
207
			return getResourceProfileByQuery(
208
					"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
209
		} catch (final ISLookUpException e) {
210
			throw new IllegalStateException("unable to read solr ZK url from service profile", e);
211
		}
212
	}
213

    
214
	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
215
		log.debug("quering for service property: " + xquery);
216
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
217
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
218
		return res;
219
	}
220

    
221
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
222
		return oaf -> {
223

    
224
			final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
225
			final Map<String, String> res = Maps.newHashMap();
226
			final String oafId = cleanId(oaf.getEntity().getId());
227

    
228
			final List<String> idList = oaf.getEntity().getChildrenList().stream()
229
					.map(e -> e.getId())
230
					.map(s -> cleanId(s))
231
					.collect(Collectors.toList());
232
			if (idList.isEmpty()) {
233
				idList.add(oafId);
234
			}
235
			res.put("id", oafId);
236
			res.put("idList", Joiner.on(",").join(idList));
237
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
238

    
239
			for (final String fieldName : fields) {
240
				res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
241
			}
242

    
243
			return res;
244
		};
245
	}
246

    
247
	private String cleanId(final String id) {
248
		return id.replaceFirst(ID_PREFIX_REGEX, "");
249
	}
250

    
251
	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
252
		if (indexClient == null) {
253
			indexClient = indexClientFactory.getClient(indexFormat, "index", "dedup", "solr");
254
		}
255
		return indexClient;
256
	}
257

    
258
	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
259
		return Iterables.transform(ids, id -> {
260
			try {
261
				final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
262
				final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
263

    
264
				log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
265

    
266
				return Iterables.getOnlyElement(rsp.getRecords());
267
			} catch (final Throwable e) {
268
				log.error(e);
269
				throw new RuntimeException("unable to query id: " + id, e);
270
			}
271
		});
272
	}
273

    
274
	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
275
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
276
			// TODO add more changes to the Oaf object here as needed.
277
			builder.getDataInfoBuilder().setDeletedbyinference(true);
278
			return builder.build();
279
		}));
280
	}
281

    
282
	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
283
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
284
			// TODO add more changes to the Oaf object here as needed.
285
			builder.getDataInfoBuilder().setDeletedbyinference(false);
286
			return builder.build();
287
		}));
288
	}
289

    
290
	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
291
		return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
292
	}
293

    
294
	private String newRootId(final SimilarityGroup group) {
295
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
296
		// else return Collections.min(group.getRootIds());
297
	}
298

    
299
	private List<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
300
		return Lists.newArrayList(Iterables.transform(protos, mapper));
301
	}
302

    
303
	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
304
		return oaf -> {
305
			try {
306
				return mapper.map(oaf, group.getDate(), "", group.getActionSet());
307
			} catch (final Throwable e) {
308
				throw new IllegalArgumentException("unable to map proto to index document", e);
309
			}
310
		};
311
	}
312

    
313
	private Set<String> unique(final Map<String, Set<String>> map) {
314
		final Set<String> res = Sets.newHashSet();
315
		for (final Set<String> ids : map.values()) {
316
			res.addAll(ids);
317
		}
318

    
319
		return res;
320
	}
321

    
322
	private DedupConfig getDedupConf(final SimilarityGroup group) throws IOException {
323
		final Map<String, String> config = Maps.newHashMap();
324
		config.put("entityType", group.getEntityType().getType());
325
		config.put("configurationId", group.getActionSet());
326
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
327
		return dedupConf;
328
	}
329

    
330
}
(3-3/6)