Project

General

Profile

1
package eu.dnetlib.data.dedup;
2

    
3
import java.io.IOException;
4
import java.util.*;
5
import java.util.stream.Collectors;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.base.Joiner;
10
import com.google.common.base.Splitter;
11
import com.google.common.collect.Iterables;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Maps;
14
import com.google.common.collect.Sets;
15
import com.google.protobuf.GeneratedMessage;
16
import eu.dnetlib.data.mapreduce.util.OafDecoder;
17
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.transform.AbstractProtoMapper;
20
import eu.dnetlib.data.transform.OafEntityMerger;
21
import eu.dnetlib.data.transform.SolrProtoMapper;
22
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
23
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
24
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
25
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
26
import eu.dnetlib.functionality.index.client.IndexClient;
27
import eu.dnetlib.functionality.index.client.IndexClientException;
28
import eu.dnetlib.functionality.index.client.response.LookupResponse;
29
import eu.dnetlib.functionality.index.client.solr.SolrIndexClient;
30
import eu.dnetlib.functionality.index.client.solr.SolrIndexClientFactory;
31
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
32
import eu.dnetlib.pace.config.DedupConfig;
33
import eu.dnetlib.pace.model.Field;
34
import eu.dnetlib.pace.model.FieldDef;
35
import eu.dnetlib.pace.model.FieldValueImpl;
36
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
37
import org.apache.commons.codec.binary.Base64;
38
import org.apache.commons.lang.StringUtils;
39
import org.apache.commons.logging.Log;
40
import org.apache.commons.logging.LogFactory;
41

    
42
import org.apache.solr.common.SolrInputDocument;
43
import org.dom4j.DocumentException;
44
import org.springframework.beans.factory.annotation.Autowired;
45
import org.springframework.beans.factory.annotation.Value;
46

    
47
public class DedupIndexDAO {
48

    
49
	private static final Log log = LogFactory.getLog(DedupIndexDAO.class);
50

    
51
	private static final String ID_PREFIX_REGEX = "^\\d\\d\\|";
52

    
53
	private static final Map<String, Map<String, String>> paths = Maps.newHashMap();
54

    
55
	static {
56
		paths.put("result", new HashMap<>());
57
		paths.put("organization", new HashMap<>());
58
		paths.put("person", new HashMap<>());
59

    
60
		paths.get("result").put("provenance", "collectedfrom/value");
61
		paths.get("organization").put("provenance", "collectedfrom/value");
62
		paths.get("person").put("provenance", "collectedfrom/value");
63

    
64
		paths.get("result").put("title", "result/metadata/title/value");
65
		paths.get("result").put("dateofacceptance", "result/metadata/dateofacceptance/value");
66
		paths.get("result").put("description", "result/metadata/description/value");
67
		paths.get("result").put("author", "result/metadata/author/fullname");
68

    
69
		paths.get("organization").put("legalname", "organization/metadata/legalname/value");
70
		paths.get("organization").put("legalshortname", "organization/metadata/legalshortname/value");
71
		paths.get("organization").put("websiteurl", "organization/metadata/websiteurl/value");
72
		paths.get("organization").put("country", "organization/metadata/country/classid");
73

    
74
		paths.get("person").put("fullname", "person/metadata/fullname/value");
75
	}
76

    
77
	@Resource
78
	private UniqueServiceLocator serviceLocator;
79

    
80
	/**
81
	 * The index client factory.
82
	 */
83
	@Autowired
84
	private SolrIndexClientFactory indexClientFactory;
85

    
86
	private IndexClient indexClient = null;
87

    
88
	@Value("${dnet.dedup.index.collection}")
89
	private String dedupIndexCollection;
90

    
91
	public OafResult search(final String type, final String userQuery, final String actionSet, final int start, final int rows, final String fields)
92
			throws Exception {
93

    
94
		final String cqlQuery =
95
				String.format("(>s=SOLR s.q.op=AND) and oaftype = %s and actionset exact \"%s\" and deletedbyinference = false and %s", type, actionSet,
96
						userQuery);
97

    
98
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, start, (start + rows) - 1);
99

    
100
		final List<String> fieldList = Lists.newLinkedList(Splitter.on(",").omitEmptyStrings().trimResults().split(fields));
101
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(toOaf(rsp), getOaf2FieldMapFunction(type, fieldList)));
102

    
103
		return new OafResult(rsp.getTotal(), resList);
104

    
105
	}
106

    
107
	public OafResult searchById(final String actionSet, final String type, final String objidentifier, final List<String> fields) throws Exception {
108
		final String cqlQuery = "objidentifier exact \"" + objidentifier + "\" and actionset exact \"" + actionSet + "\"";
109

    
110
		final LookupResponse rsp = getIndexClient().lookup(cqlQuery, null, 0, 1);
111

    
112
		final Iterable<Oaf> oafList = toOaf(rsp);
113

    
114
		final List<Map<String, String>> resList = Lists.newLinkedList(Iterables.transform(oafList, getOaf2FieldMapFunction(type, fields)));
115

    
116
		return new OafResult(rsp.getTotal(), resList);
117
	}
118

    
119
	public boolean commit(final SimilarityGroup group) throws Exception {
120

    
121
		int commitStatus = 0;
122
		int addStatus = 0;
123

    
124
		log.info("starting index update");
125

    
126
		try(final SolrIndexClient indexClient = (SolrIndexClient) getIndexClient()) {
127
			final SolrProtoMapper mapper = initProtoMapper();
128

    
129
			final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
130
			final List<SolrInputDocument> buffer = Lists.newLinkedList();
131

    
132
			// mark as deleted all the documents in the group
133
			final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
134
			buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
135

    
136
			// elect a new representative
137
			final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
138
			final String newRootId = (String) newRoot.getFieldValue("objidentifier");
139
			// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
140
			buffer.add(newRoot);
141

    
142
			// mark as non deleted the documents taken away from the group
143
			final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
144
			buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
145

    
146
			log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
147

    
148
			// add the changes to the server
149
			addStatus = indexClient.feed(buffer);
150
			log.debug("solr add status: " + addStatus);
151

    
152
			// delete the old representatives, avoiding to remove the current one (if it didn't change)
153
			log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
154
			for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
155
				indexClient.remove(String.format("objidentifier:\"%s\"", mapper.getRecordId(rootId, group.getActionSet())));
156
			}
157

    
158
			commitStatus = indexClient.commit().getStatus();
159

    
160
			log.debug("solr commit status: " + commitStatus);
161
		}
162

    
163
		return (addStatus == 0) && (commitStatus == 0);
164
	}
165

    
166
	private Iterable<Oaf> toOaf(final LookupResponse rsp) {
167
		return Iterables.transform(rsp.getRecords(), getXml2OafFunction());
168
	}
169

    
170
	private Iterable<Oaf> parseBase64(final Iterable<String> r) {
171
		return Iterables.transform(r, getXml2OafFunction());
172
	}
173

    
174
	private Function<String, Oaf> getXml2OafFunction() {
175
		return s -> {
176
			// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
177
			final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
178
			try {
179
				final byte[] oafBytes = Base64.decodeBase64(base64);
180
				final Oaf oaf = OafDecoder.decode(oafBytes).getOaf();
181
				return oaf;
182
			} catch (final Throwable e) {
183
				throw new IllegalArgumentException("unable to decode base64 encoded Oaf object: " + base64);
184
			}
185
		};
186
	}
187

    
188
	private SolrProtoMapper initProtoMapper() throws DocumentException, ISLookUpException, ISLookUpDocumentNotFoundException {
189
		return new SolrProtoMapper(
190
				serviceLocator
191
						.getService(ISLookUpService.class)
192
						.getResourceProfileByQuery(
193
								"collection('')//RESOURCE_PROFILE["
194
										+ ".//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and "
195
										+ ".//NAME='OPENAIRE']//LAYOUT[@name='index']/FIELDS"));
196
	}
197

    
198
	private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
199
		log.debug("quering for service property: " + xquery);
200
		final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
201
		if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
202
		return res;
203
	}
204

    
205
	private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
206
		return oaf -> {
207

    
208
			final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
209
			final Map<String, String> res = Maps.newHashMap();
210
			final String oafId = cleanId(oaf.getEntity().getId());
211

    
212
			final List<String> idList = oaf.getEntity().getChildrenList().stream()
213
					.map(e -> e.getId())
214
					.map(s -> cleanId(s))
215
					.collect(Collectors.toList());
216
			if (idList.isEmpty()) {
217
				idList.add(oafId);
218
			}
219
			res.put("id", oafId);
220
			res.put("idList", Joiner.on(",").join(idList));
221
			res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
222

    
223
			for (final String fieldName : fields) {
224

    
225
				res.put(fieldName, Joiner.on("; ").skipNulls().join(getFieldValues(ed.getEntity(), fieldName, paths.get(type).get(fieldName))));
226
			}
227

    
228
			return res;
229
		};
230
	}
231

    
232
	private List<String> getFieldValues(final GeneratedMessage m, final String fieldName, final String path) {
233
		return new SolrDocumentMapper().processPath(m, fieldName, path).stream()
234
				.map(o -> o.toString())
235
				.collect(Collectors.toCollection(LinkedList::new));
236
	}
237

    
238
	class SolrDocumentMapper extends AbstractProtoMapper {
239

    
240
		public List<Object> processPath(final GeneratedMessage m, final String fieldName, final String path) {
241
			final FieldDef fd = new FieldDef();
242
			fd.setName(fieldName);
243
			return processPath(m, fd, path);
244
		}
245
	}
246

    
247
	private String cleanId(final String id) {
248
		return id.replaceFirst(ID_PREFIX_REGEX, "");
249
	}
250

    
251
	private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
252
		if (indexClient == null) {
253
			indexClient = indexClientFactory.getClient(dedupIndexCollection);
254
		}
255
		return indexClient;
256
	}
257

    
258
	private Iterable<String> queryIndex(final Iterable<String> ids, final String actionset) {
259
		return Iterables.transform(ids, id -> {
260
			try {
261
				final String cql = "objidentifier exact \"" + id + "\" and actionset exact \"" + actionset + "\"";
262
				final LookupResponse rsp = getIndexClient().lookup(cql, null, 0, 1);
263

    
264
				log.debug(String.format("query index for id '%s', found '%d'", id, rsp.getTotal()));
265

    
266
				return Iterables.getOnlyElement(rsp.getRecords());
267
			} catch (final Throwable e) {
268
				log.error(e);
269
				throw new RuntimeException("unable to query id: " + id, e);
270
			}
271
		});
272
	}
273

    
274
	private List<Oaf> markDeleted(final Iterable<Oaf.Builder> builders) {
275
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
276
			// TODO add more changes to the Oaf object here as needed.
277
			builder.getDataInfoBuilder().setDeletedbyinference(true);
278
			return builder.build();
279
		}));
280
	}
281

    
282
	private List<Oaf> markUnDeleted(final Iterable<Oaf.Builder> builders) {
283
		return Lists.newArrayList(Iterables.transform(builders, builder -> {
284
			// TODO add more changes to the Oaf object here as needed.
285
			builder.getDataInfoBuilder().setDeletedbyinference(false);
286
			return builder.build();
287
		}));
288
	}
289

    
290
	private Iterable<Oaf.Builder> asOafBuilder(final Iterable<Oaf> oaf) {
291
		return Iterables.transform(oaf, oaf1 -> Oaf.newBuilder(oaf1));
292
	}
293

    
294
	private String newRootId(final SimilarityGroup group) {
295
		return "dedup_wf_001::" + Collections.min(group.getGroup()).replaceFirst("^.*::", "");
296
		// else return Collections.min(group.getRootIds());
297
	}
298

    
299
	private List<SolrInputDocument> asIndexDocs(final Function<Oaf, SolrInputDocument> mapper, final Iterable<Oaf> protos) {
300
		return Lists.newArrayList(Iterables.transform(protos, mapper));
301
	}
302

    
303
	private Function<Oaf, SolrInputDocument> oaf2solr(final SimilarityGroup group, final SolrProtoMapper mapper) {
304
		return oaf -> {
305
			try {
306
				return mapper.map(oaf, group.getDate(), "", group.getActionSet());
307
			} catch (final Throwable e) {
308
				throw new IllegalArgumentException("unable to map proto to index document", e);
309
			}
310
		};
311
	}
312

    
313
	private Set<String> unique(final Map<String, Set<String>> map) {
314
		final Set<String> res = Sets.newHashSet();
315
		for (final Set<String> ids : map.values()) {
316
			res.addAll(ids);
317
		}
318

    
319
		return res;
320
	}
321

    
322
	private DedupConfig getDedupConf(final SimilarityGroup group) throws IOException {
323
		final Map<String, String> config = Maps.newHashMap();
324
		config.put("entityType", group.getEntityType().getType());
325
		config.put("configurationId", group.getActionSet());
326
		final DedupConfig dedupConf = DedupConfig.loadDefault(config);
327
		return dedupConf;
328
	}
329

    
330
}
(3-3/6)