Project

General

Profile

1
package eu.dnetlib.data.dedup;
2

    
3
import java.io.IOException;
4
import java.util.*;
5
import java.util.stream.Collectors;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.base.Joiner;
10
import com.google.common.base.Splitter;
11
import com.google.common.collect.Iterables;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.data.transform.OafEntityMerger;
19
import eu.dnetlib.data.transform.SolrProtoMapper;
20
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
21
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
23
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24
import eu.dnetlib.functionality.index.client.IndexClient;
25
import eu.dnetlib.functionality.index.client.IndexClientException;
26
import eu.dnetlib.functionality.index.client.response.LookupResponse;
27
import eu.dnetlib.functionality.index.client.solr.SolrIndexClient;
28
import eu.dnetlib.functionality.index.client.solr.SolrIndexClientFactory;
29
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
30
import eu.dnetlib.pace.config.DedupConfig;
31
import org.apache.commons.codec.binary.Base64;
32
import org.apache.commons.lang.StringUtils;
33
import org.apache.commons.logging.Log;
34
import org.apache.commons.logging.LogFactory;
35

    
36
import org.apache.solr.common.SolrInputDocument;
37
import org.dom4j.DocumentException;
38
import org.springframework.beans.factory.annotation.Autowired;
39
import org.springframework.beans.factory.annotation.Value;
40

    
41
public class DedupIndexDAO {
42

    
43
	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);
44

    
45
	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";
46

    
47
	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();
48

    
49
	static {
50
		paths.put("result", new HashMap<>());
51
		paths.put("organization", new HashMap<>());
52
		paths.put("person", new HashMap<>());
53

    
54
		paths.get("result").put("provenance", "collectedfrom/value");
55
		paths.get("organization").put("provenance", "collectedfrom/value");
56
		paths.get("person").put("provenance", "collectedfrom/value");
57

    
58
		paths.get("result").put("title", "result/metadata/title/value");
59
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
60
		paths.get("result").put("description", "result/metadata/description/value");
61
		paths.get("result").put("author", "result/metadata/author/fullname");
62

    
63
		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
64
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
65
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
66
		paths.get("organization").put("country", "organization/metadata/country/classid");
67

    
68
		paths.get("person").put("fullname", "person/metadata/fullname/value");
69
	}
70

    
71
	@Resource
72
	private UniqueServiceLocator serviceLocator;
73

    
74
	/**
75
	 * The index client factory.
76
	 */
77
	@Autowired
78
	private SolrIndexClientFactory indexClientFactory;
79

    
80
	private IndexClient indexClient = null;
81

    
82
	@Value("${dnet.dedup.index.collection}")
83
	private String dedupIndexCollection;
84

    
85
	public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields)
86
			throws Exception {
87

    
88
		final String cqlQuery =
89
				String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
90
						userQuery);
91

    
92
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);
93

    
94
		final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
95
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));
96

    
97
		return new OafResult(rsp.getTotal(), resList);
98

    
99
	}
100

    
101
	public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception {
102
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";
103

    
104
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);
105

    
106
		final Iterable<Oaf> oafList = toOaf(rsp);
107

    
108
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));
109

    
110
		return new OafResult(rsp.getTotal(), resList);
111
	}
112

    
113
	public boolean commit(final SimilarityGroup group) throws Exception {
114

    
115
		int commitStatus = 0;
116
		int addStatus = 0;
117

    
118
		log.info("starting index update");
119

    
120
		try(final SolrIndexClient indexClient = (SolrIndexClient) getIndexClient()) {
121
			final SolrProtoMapper mapper = initProtoMapper();
122

    
123
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
124
			final List<SolrInputDocument> buffer = Lists.newLinkedList();
125

    
126
			// mark as deleted all the documents in the group
127
			final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
128
			buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
129

    
130
			// elect a new representative
131
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
132
			final String newRootId = (String) newRoot.getFieldValue("objidentifier");
133
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
134
			buffer.add(newRoot);
135

    
136
			// mark as non deleted the documents taken away from the group
137
			final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
138
			buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
139

    
140
			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
141

    
142
			// add the changes to the server
143
			addStatus = indexClient.feed(buffer);
144
			log.debug("solr add status: " + addStatus);
145

    
146
			// delete the old representatives, avoiding to remove the current one (if it didn't change)
147
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
148
			for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
149
				indexClient.remove(String.format("objidentifier:\"%s\"", mapper.getRecordId(rootId, group.getActionSet())));
150
			}
151

    
152
			commitStatus = indexClient.commit().getStatus();
153

    
154
			log.debug("solr commit status: " + commitStatus);
155
		}
156

    
157
		return (addStatus == 0) && (commitStatus == 0);
158
	}
159

    
160
	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
161
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
162
	}
163

    
164
	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
165
		return Iterables.transform(r, getXml2OafFunction());
166
	}
167

    
168
	private Function<String, Oaf> getXml2OafFunction() {
169
		return s -> {
170
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
171
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
172
			try {
173
				final byte[] oafBytes = Base64.decodeBase64(base64);
174
				final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
175
				return oaf;
176
			} catch (final Throwable e) {
177
				throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
178
			}
179
		};
180
	}
181

    
182
	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
183
		return new SolrProtoMapper(
184
				serviceLocator
185
						.getService(ISLookUpService.class)
186
						.getResourceProfileByQuery(
187
								"collection('')//RESOURCE_PROFILE["
188
										+ ".//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and "
189
										+ ".//NAME='OPENAIRE']//LAYOUT[@name='index']/FIELDS"));
190
	}
191

    
192
	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
193
		log.debug("quering for service property: " + xquery);
194
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
195
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
196
		return res;
197
	}
198

    
199
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
200
		return oaf -> {
201

    
202
			final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
203
			final Map<String, String> res = Maps.newHashMap();
204
			final String oafId = cleanId(oaf.getEntity().getId());
205

    
206
			final List<String> idList = oaf.getEntity().getChildrenList().stream()
207
					.map(e -> e.getId())
208
					.map(s -> cleanId(s))
209
					.collect(Collectors.toList());
210
			if (idList.isEmpty()) {
211
				idList.add(oafId);
212
			}
213
			res.put("id", oafId);
214
			res.put("idList", Joiner.on(",").join(idList));
215
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
216

    
217
			for (final String fieldName : fields) {
218
				res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
219
			}
220

    
221
			return res;
222
		};
223
	}
224

    
225
	private String cleanId(final String id) {
226
		return id.replaceFirst(ID_PREFIX_REGEX, "");
227
	}
228

    
229
	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
230
		if (indexClient == null) {
231
			indexClient = indexClientFactory.getClient(dedupIndexCollection);
232
		}
233
		return indexClient;
234
	}
235

    
236
	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
237
		return Iterables.transform(ids, id -> {
238
			try {
239
				final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
240
				final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
241

    
242
				log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
243

    
244
				return Iterables.getOnlyElement(rsp.getRecords());
245
			} catch (final Throwable e) {
246
				log.error(e);
247
				throw new RuntimeException("unable to query id: " + id, e);
248
			}
249
		});
250
	}
251

    
252
	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
253
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
254
			// TODO add more changes to the Oaf object here as needed.
255
			builder.getDataInfoBuilder().setDeletedbyinference(true);
256
			return builder.build();
257
		}));
258
	}
259

    
260
	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
261
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
262
			// TODO add more changes to the Oaf object here as needed.
263
			builder.getDataInfoBuilder().setDeletedbyinference(false);
264
			return builder.build();
265
		}));
266
	}
267

    
268
	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
269
		return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
270
	}
271

    
272
	private String newRootId(final SimilarityGroup group) {
273
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
274
		// else return Collections.min(group.getRootIds());
275
	}
276

    
277
	private List<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
278
		return Lists.newArrayList(Iterables.transform(protos, mapper));
279
	}
280

    
281
	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
282
		return oaf -> {
283
			try {
284
				return mapper.map(oaf, group.getDate(), "", group.getActionSet());
285
			} catch (final Throwable e) {
286
				throw new IllegalArgumentException("unable to map proto to index document", e);
287
			}
288
		};
289
	}
290

    
291
	private Set<String> unique(final Map<String, Set<String>> map) {
292
		final Set<String> res = Sets.newHashSet();
293
		for (final Set<String> ids : map.values()) {
294
			res.addAll(ids);
295
		}
296

    
297
		return res;
298
	}
299

    
300
	private DedupConfig getDedupConf(final SimilarityGroup group) throws IOException {
301
		final Map<String, String> config = Maps.newHashMap();
302
		config.put("entityType", group.getEntityType().getType());
303
		config.put("configurationId", group.getActionSet());
304
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
305
		return dedupConf;
306
	}
307

    
308
}
(3-3/6)