12 |
12 |
import com.google.common.collect.Lists;
|
13 |
13 |
import com.google.common.collect.Maps;
|
14 |
14 |
import com.google.common.collect.Sets;
|
|
15 |
import com.google.protobuf.GeneratedMessage;
|
15 |
16 |
import eu.dnetlib.data.mapreduce.util.OafDecoder;
|
16 |
17 |
import eu.dnetlib.data.mapreduce.util.OafEntityDecoder;
|
17 |
18 |
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
|
19 |
import eu.dnetlib.data.transform.AbstractProtoMapper;
|
18 |
20 |
import eu.dnetlib.data.transform.OafEntityMerger;
|
19 |
21 |
import eu.dnetlib.data.transform.SolrProtoMapper;
|
20 |
22 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpDocumentNotFoundException;
|
... | ... | |
23 |
25 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
|
24 |
26 |
import eu.dnetlib.functionality.index.client.IndexClient;
|
25 |
27 |
import eu.dnetlib.functionality.index.client.IndexClientException;
|
26 |
|
import eu.dnetlib.functionality.index.client.ResolvingIndexClientFactory;
|
27 |
28 |
import eu.dnetlib.functionality.index.client.response.LookupResponse;
|
|
29 |
import eu.dnetlib.functionality.index.client.solr.SolrIndexClient;
|
|
30 |
import eu.dnetlib.functionality.index.client.solr.SolrIndexClientFactory;
|
28 |
31 |
import eu.dnetlib.functionality.modular.ui.dedup.SimilarityGroup;
|
29 |
32 |
import eu.dnetlib.pace.config.DedupConfig;
|
|
33 |
import eu.dnetlib.pace.model.Field;
|
|
34 |
import eu.dnetlib.pace.model.FieldDef;
|
|
35 |
import eu.dnetlib.pace.model.FieldValueImpl;
|
|
36 |
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
|
30 |
37 |
import org.apache.commons.codec.binary.Base64;
|
31 |
38 |
import org.apache.commons.lang.StringUtils;
|
32 |
39 |
import org.apache.commons.logging.Log;
|
33 |
40 |
import org.apache.commons.logging.LogFactory;
|
34 |
|
import org.apache.solr.client.solrj.impl.CloudSolrServer;
|
|
41 |
|
|
42 |
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
35 |
43 |
import org.apache.solr.common.SolrInputDocument;
|
36 |
44 |
import org.dom4j.DocumentException;
|
37 |
45 |
import org.springframework.beans.factory.annotation.Autowired;
|
... | ... | |
74 |
82 |
* The index client factory.
|
75 |
83 |
*/
|
76 |
84 |
@Autowired
|
77 |
|
private ResolvingIndexClientFactory indexClientFactory;
|
|
85 |
private SolrIndexClientFactory indexClientFactory;
|
78 |
86 |
|
79 |
87 |
private IndexClient indexClient = null;
|
80 |
88 |
|
81 |
|
@Value("${dnet.dedup.index.format}")
|
82 |
|
private String indexFormat;
|
83 |
|
|
84 |
89 |
@Value("${dnet.dedup.index.collection}")
|
85 |
90 |
private String dedupIndexCollection;
|
86 |
91 |
|
... | ... | |
119 |
124 |
|
120 |
125 |
log.info("starting index update");
|
121 |
126 |
|
122 |
|
final CloudSolrServer solrServer = getSolrServer();
|
123 |
|
try {
|
124 |
|
final SolrProtoMapper mapper = initProtoMapper();
|
|
127 |
final SolrIndexClient indexClient = (SolrIndexClient) getIndexClient();
|
|
128 |
final SolrProtoMapper mapper = initProtoMapper();
|
125 |
129 |
|
126 |
|
final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
|
127 |
|
final List<SolrInputDocument> buffer = Lists.newLinkedList();
|
|
130 |
final Function<Oaf, SolrInputDocument> oaf2solr = oaf2solr(group, mapper);
|
|
131 |
final List<SolrInputDocument> buffer = Lists.newLinkedList();
|
128 |
132 |
|
129 |
|
// mark as deleted all the documents in the group
|
130 |
|
final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
|
131 |
|
buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
|
|
133 |
// mark as deleted all the documents in the group
|
|
134 |
final List<Oaf> groupDocs = markDeleted(asOafBuilder(parseBase64(queryIndex(group.getGroup(), group.getActionSet()))));
|
|
135 |
buffer.addAll(asIndexDocs(oaf2solr, groupDocs));
|
132 |
136 |
|
133 |
|
// elect a new representative
|
134 |
|
final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
|
135 |
|
final String newRootId = (String) newRoot.getFieldValue("objidentifier");
|
136 |
|
// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
|
137 |
|
buffer.add(newRoot);
|
|
137 |
// elect a new representative
|
|
138 |
final SolrInputDocument newRoot = oaf2solr.apply(OafEntityMerger.merge(getDedupConf(group), newRootId(group), groupDocs).build());
|
|
139 |
final String newRootId = (String) newRoot.getFieldValue("objidentifier");
|
|
140 |
// newRoot.setField("actionset", dedupConf.getWf().getConfigurationId());
|
|
141 |
buffer.add(newRoot);
|
138 |
142 |
|
139 |
|
// mark as non deleted the documents taken away from the group
|
140 |
|
final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
|
141 |
|
buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
|
|
143 |
// mark as non deleted the documents taken away from the group
|
|
144 |
final List<Oaf> dissimDocs = markUnDeleted(asOafBuilder(parseBase64(queryIndex(unique(group.getDissimilar()), group.getActionSet()))));
|
|
145 |
buffer.addAll(asIndexDocs(oaf2solr, dissimDocs));
|
142 |
146 |
|
143 |
|
log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
|
|
147 |
log.debug(String.format("adding %d documents to index %s", buffer.size(), dedupIndexCollection));
|
144 |
148 |
|
145 |
|
// add the changes to the server
|
146 |
|
addStatus = solrServer.add(buffer).getStatus();
|
147 |
|
log.debug("solr add status: " + addStatus);
|
|
149 |
// add the changes to the server
|
|
150 |
addStatus = indexClient.feed(buffer);
|
|
151 |
log.debug("solr add status: " + addStatus);
|
148 |
152 |
|
149 |
|
// delete the old representatives, avoiding to remove the current one (if it didn't change)
|
150 |
|
log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
|
151 |
|
for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
|
152 |
|
solrServer.deleteById(mapper.getRecordId(rootId, group.getActionSet()));
|
153 |
|
}
|
|
153 |
// delete the old representatives, avoiding to remove the current one (if it didn't change)
|
|
154 |
log.debug(String.format("deleting %d documents from index %s", group.getRootIds().size(), dedupIndexCollection));
|
|
155 |
for (final String rootId : Iterables.filter(group.getRootIds(), rootId -> !rootId.equals(newRootId))) {
|
|
156 |
indexClient.remove(mapper.getRecordId(rootId, group.getActionSet()));
|
|
157 |
}
|
154 |
158 |
|
155 |
|
commitStatus = solrServer.commit().getStatus();
|
|
159 |
commitStatus = indexClient.commit().getStatus();
|
156 |
160 |
|
157 |
|
log.debug("solr commit status: " + commitStatus);
|
158 |
|
} finally {
|
159 |
|
solrServer.shutdown();
|
160 |
|
}
|
|
161 |
log.debug("solr commit status: " + commitStatus);
|
161 |
162 |
|
162 |
163 |
return (addStatus == 0) && (commitStatus == 0);
|
163 |
164 |
}
|
... | ... | |
170 |
171 |
return Iterables.transform(r, getXml2OafFunction());
|
171 |
172 |
}
|
172 |
173 |
|
173 |
|
private Function<String, Oaf> getXml2OafFunction() {
|
|
174 |
protected Function<String, Oaf> getXml2OafFunction() {
|
174 |
175 |
return s -> {
|
175 |
176 |
// final String base64 = s.replaceAll("<record.*>", "").replace("</record>", "");
|
176 |
177 |
final String base64 = StringUtils.substringBefore(StringUtils.substringAfter(s, ">"), "<");
|
... | ... | |
189 |
190 |
serviceLocator
|
190 |
191 |
.getService(ISLookUpService.class)
|
191 |
192 |
.getResourceProfileByQuery(
|
192 |
|
"collection('')//RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and .//NAME='" + indexFormat
|
193 |
|
+ "']//LAYOUT[@name='index']/FIELDS"));
|
|
193 |
"collection('')//RESOURCE_PROFILE["
|
|
194 |
+ ".//RESOURCE_TYPE/@value = 'MDFormatDSResourceType' and "
|
|
195 |
+ ".//NAME='OPENAIRE']//LAYOUT[@name='index']/FIELDS"));
|
194 |
196 |
}
|
195 |
197 |
|
196 |
|
private CloudSolrServer getSolrServer() {
|
197 |
|
final String zk = getIndexSolrUrlZk();
|
198 |
|
log.info(String.format("initializing solr client for collection %s, zk url: %s", dedupIndexCollection, zk));
|
199 |
|
final CloudSolrServer solrServer = new CloudSolrServer(zk);
|
200 |
|
solrServer.setDefaultCollection(dedupIndexCollection);
|
201 |
|
|
202 |
|
return solrServer;
|
203 |
|
}
|
204 |
|
|
205 |
|
private String getIndexSolrUrlZk() {
|
206 |
|
try {
|
207 |
|
return getResourceProfileByQuery(
|
208 |
|
"for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()");
|
209 |
|
} catch (final ISLookUpException e) {
|
210 |
|
throw new IllegalStateException("unable to read solr ZK url from service profile", e);
|
211 |
|
}
|
212 |
|
}
|
213 |
|
|
214 |
198 |
private String getResourceProfileByQuery(final String xquery) throws ISLookUpException {
|
215 |
199 |
log.debug("quering for service property: " + xquery);
|
216 |
200 |
final String res = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
|
... | ... | |
218 |
202 |
return res;
|
219 |
203 |
}
|
220 |
204 |
|
221 |
|
private Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
|
|
205 |
protected Function<Oaf, Map<String, String>> getOaf2FieldMapFunction(final String type, final List<String> fields) {
|
222 |
206 |
return oaf -> {
|
223 |
207 |
|
224 |
|
final OafEntityDecoder ed = OafDecoder.decode(oaf).decodeEntity();
|
225 |
208 |
final Map<String, String> res = Maps.newHashMap();
|
226 |
209 |
final String oafId = cleanId(oaf.getEntity().getId());
|
227 |
210 |
|
... | ... | |
237 |
220 |
res.put("groupSize", idList.isEmpty() ? "1" : idList.size() + "");
|
238 |
221 |
|
239 |
222 |
for (final String fieldName : fields) {
|
240 |
|
res.put(fieldName, Joiner.on("; ").skipNulls().join(ed.getFieldValues(paths.get(type).get(fieldName))));
|
|
223 |
|
|
224 |
res.put(fieldName, Joiner.on("; ").skipNulls().join(getFieldValues((GeneratedMessage) oaf.getEntity(), fieldName, paths.get(type).get(fieldName))));
|
241 |
225 |
}
|
242 |
226 |
|
243 |
227 |
return res;
|
244 |
228 |
};
|
245 |
229 |
}
|
246 |
230 |
|
|
231 |
private List<String> getFieldValues(final GeneratedMessage m, final String fieldName, final String path) {
|
|
232 |
return new SolrDocumentMapper().processPath(m, fieldName, path).stream()
|
|
233 |
.map(o -> o.toString())
|
|
234 |
.collect(Collectors.toCollection(LinkedList::new));
|
|
235 |
}
|
|
236 |
|
|
237 |
class SolrDocumentMapper extends AbstractProtoMapper {
|
|
238 |
|
|
239 |
public List<Object> processPath(final GeneratedMessage m, final String fieldName, final String path) {
|
|
240 |
final FieldDef fd = new FieldDef();
|
|
241 |
fd.setName(fieldName);
|
|
242 |
return processPath(m, fd, path);
|
|
243 |
}
|
|
244 |
}
|
|
245 |
|
247 |
246 |
private String cleanId(final String id) {
|
248 |
247 |
return id.replaceFirst(ID_PREFIX_REGEX, "");
|
249 |
248 |
}
|
250 |
249 |
|
251 |
250 |
private IndexClient getIndexClient() throws IndexClientException, ISLookUpDocumentNotFoundException, ISLookUpException {
|
252 |
251 |
if (indexClient == null) {
|
253 |
|
indexClient = indexClientFactory.getClient(indexFormat, "index", "dedup", "solr");
|
|
252 |
indexClient = indexClientFactory.getClient(dedupIndexCollection);
|
254 |
253 |
}
|
255 |
254 |
return indexClient;
|
256 |
255 |
}
|
reintegrated branch solr75 -r53788:HEAD