Project

General

Profile

1
package eu.dnetlib.data.dedup;
2

    
3
import java.io.IOException;
4
import java.util.*;
5
import java.util.stream.Collectors;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.base.Joiner;
10
import com.google.common.base.Splitter;
11
import com.google.common.collect.Iterables;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.data.transform.OafEntityMerger;
19
import eu.dnetlib.data.transform.SolrProtoMapper;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.functionality.index.client.IndexClient;
25
import eu.dnetlib.functionality.index.client.IndexClientException;
26
import eu.dnetlib.functionality.index.client.ResolvingIndexClientFactory;
27
import eu.dnetlib.functionality.index.client.response.LookupResponse;
28
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
29
import eu.dnetlib.pace.config.DedupConfig;
30
import org.apache.commons.codec.binary.Base64;
31
import org.apache.commons.lang.StringUtils;
32
import org.apache.commons.logging.Log;
33
import org.apache.commons.logging.LogFactory;
34
import org.apache.solr.client.solrj.impl.CloudSolrClient;
35
import org.apache.solr.common.SolrInputDocument;
36
import org.dom4j.DocumentException;
37
import org.springframework.beans.factory.annotation.Autowired;
38
import org.springframework.beans.factory.annotation.Value;
39

    
40
public class DedupIndexDAO {
41

    
42
	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);
43

    
44
	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";
45

    
46
	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();
47

    
48
	static {
49
		paths.put("result", new HashMap<>());
50
		paths.put("organization", new HashMap<>());
51
		paths.put("person", new HashMap<>());
52

    
53
		paths.get("result").put("provenance", "collectedfrom/value");
54
		paths.get("organization").put("provenance", "collectedfrom/value");
55
		paths.get("person").put("provenance", "collectedfrom/value");
56

    
57
		paths.get("result").put("title", "result/metadata/title/value");
58
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
59
		paths.get("result").put("description", "result/metadata/description/value");
60
		paths.get("result").put("author", "result/author/metadata/fullname/value");
61

    
62
		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
63
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
64
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
65
		paths.get("organization").put("country", "organization/metadata/country/classid");
66

    
67
		paths.get("person").put("fullname", "person/metadata/fullname/value");
68
	}
69

    
70
	@Resource
71
	private UniqueServiceLocator serviceLocator;
72

    
73
	/**
74
	 * The index client factory.
75
	 */
76
	@Autowired
77
	private ResolvingIndexClientFactory indexClientFactory;
78

    
79
	private IndexClient indexClient = null;
80

    
81
	@Value("${dnet.dedup.index.format}")
82
	private String indexFormat;
83

    
84
	@Value("${dnet.dedup.index.collection}")
85
	private String dedupIndexCollection;
86

    
87
	public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields)
88
			throws Exception {
89

    
90
		final String cqlQuery =
91
				String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
92
						userQuery);
93

    
94
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);
95

    
96
		final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
97
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));
98

    
99
		return new OafResult(rsp.getTotal(), resList);
100

    
101
	}
102

    
103
	public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception {
104
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";
105

    
106
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);
107

    
108
		final Iterable<Oaf> oafList = toOaf(rsp);
109

    
110
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));
111

    
112
		return new OafResult(rsp.getTotal(), resList);
113
	}
114

    
115
	public boolean commit(final SimilarityGroup group) throws Exception {
116

    
117
		int commitStatus = 0;
118
		int addStatus = 0;
119

    
120
		log.info("starting index update");
121

    
122
		try(final CloudSolrClient solrServer = getSolrServer()) {
123
			final SolrProtoMapper mapper = initProtoMapper();
124

    
125
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
126
			final List<SolrInputDocument> buffer = Lists.newLinkedList();
127

    
128
			// mark as deleted all the documents in the group
129
			final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
130
			buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
131

    
132
			// elect a new representative
133
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
134
			final String newRootId = (String) newRoot.getFieldValue("objidentifier");
135
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
136
			buffer.add(newRoot);
137

    
138
			// mark as non deleted the documents taken away from the group
139
			final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
140
			buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
141

    
142
			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
143

    
144
			// add the changes to the server
145
			addStatus = solrServer.add(buffer).getStatus();
146
			log.debug("solr add status: " + addStatus);
147

    
148
			// delete the old representatives, avoiding to remove the current one (if it didn't change)
149
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
150
			for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
151
				solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
152
			}
153

    
154
			commitStatus = solrServer.commit().getStatus();
155

    
156
			log.debug("solr commit status: " + commitStatus);
157
		}
158

    
159
		return (addStatus == 0) && (commitStatus == 0);
160
	}
161

    
162
	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
163
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
164
	}
165

    
166
	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
167
		return Iterables.transform(r, getXml2OafFunction());
168
	}
169

    
170
	private Function<String, Oaf> getXml2OafFunction() {
171
		return s -> {
172
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
173
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
174
			try {
175
				final byte[] oafBytes = Base64.decodeBase64(base64);
176
				final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
177
				return oaf;
178
			} catch (final Throwable e) {
179
				throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
180
			}
181
		};
182
	}
183

    
184
	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
185
		return new SolrProtoMapper(
186
				serviceLocator
187
						.getService(ISLookUpService.class)
188
						.getResourceProfileByQuery(
189
								"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + indexFormat
190
										+ "']//LAYOUT[@name='index']/FIELDS"));
191
	}
192

    
193
	private CloudSolrClient getSolrServer() {
194
		final String zk = getIndexSolrUrlZk();
195
		log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
196
		final CloudSolrClient solrServer = new CloudSolrClient.Builder().withZkHost(zk).build();
197
		solrServer.setDefaultCollection(dedupIndexCollection);
198

    
199
		return solrServer;
200
	}
201

    
202
	private String getIndexSolrUrlZk() {
203
		try {
204
			return getResourceProfileByQuery(
205
					"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
206
		} catch (final ISLookUpException e) {
207
			throw new IllegalStateException("unable to read solr ZK url from service profile", e);
208
		}
209
	}
210

    
211
	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
212
		log.debug("quering for service property: " + xquery);
213
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
214
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
215
		return res;
216
	}
217

    
218
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
219
		return oaf -> {
220

    
221
			final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
222
			final Map<String, String> res = Maps.newHashMap();
223
			final String oafId = cleanId(oaf.getEntity().getId());
224

    
225
			final List<String> idList = oaf.getEntity().getChildrenList().stream()
226
					.map(e -> e.getId())
227
					.map(s -> cleanId(s))
228
					.collect(Collectors.toList());
229
			if (idList.isEmpty()) {
230
				idList.add(oafId);
231
			}
232
			res.put("id", oafId);
233
			res.put("idList", Joiner.on(",").join(idList));
234
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
235

    
236
			for (final String fieldName : fields) {
237
				res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
238
			}
239

    
240
			return res;
241
		};
242
	}
243

    
244
	private String cleanId(final String id) {
245
		return id.replaceFirst(ID_PREFIX_REGEX, "");
246
	}
247

    
248
	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
249
		if (indexClient == null) {
250
			indexClient = indexClientFactory.getClient(indexFormat, "index", "dedup", "solr");
251
		}
252
		return indexClient;
253
	}
254

    
255
	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
256
		return Iterables.transform(ids, id -> {
257
			try {
258
				final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
259
				final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
260

    
261
				log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
262

    
263
				return Iterables.getOnlyElement(rsp.getRecords());
264
			} catch (final Throwable e) {
265
				log.error(e);
266
				throw new RuntimeException("unable to query id: " + id, e);
267
			}
268
		});
269
	}
270

    
271
	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
272
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
273
			// TODO add more changes to the Oaf object here as needed.
274
			builder.getDataInfoBuilder().setDeletedbyinference(true);
275
			return builder.build();
276
		}));
277
	}
278

    
279
	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
280
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
281
			// TODO add more changes to the Oaf object here as needed.
282
			builder.getDataInfoBuilder().setDeletedbyinference(false);
283
			return builder.build();
284
		}));
285
	}
286

    
287
	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
288
		return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
289
	}
290

    
291
	private String newRootId(final SimilarityGroup group) {
292
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
293
		// else return Collections.min(group.getRootIds());
294
	}
295

    
296
	private List<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
297
		return Lists.newArrayList(Iterables.transform(protos, mapper));
298
	}
299

    
300
	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
301
		return oaf -> {
302
			try {
303
				return mapper.map(oaf, group.getDate(), "", group.getActionSet());
304
			} catch (final Throwable e) {
305
				throw new IllegalArgumentException("unable to map proto to index document", e);
306
			}
307
		};
308
	}
309

    
310
	private Set<String> unique(final Map<String, Set<String>> map) {
311
		final Set<String> res = Sets.newHashSet();
312
		for (final Set<String> ids : map.values()) {
313
			res.addAll(ids);
314
		}
315

    
316
		return res;
317
	}
318

    
319
	private DedupConfig getDedupConf(final SimilarityGroup group) throws IOException {
320
		final Map<String, String> config = Maps.newHashMap();
321
		config.put("entityType", group.getEntityType().getType());
322
		config.put("configurationId", group.getActionSet());
323
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
324
		return dedupConf;
325
	}
326

    
327
}
(3-3/6)