1
|
package eu.dnetlib.efg.backlinks;
|
2
|
|
3
|
import java.io.StringReader;
|
4
|
import java.io.StringWriter;
|
5
|
import java.util.*;
|
6
|
import java.util.concurrent.ArrayBlockingQueue;
|
7
|
import java.util.concurrent.BlockingQueue;
|
8
|
import java.util.concurrent.ThreadPoolExecutor;
|
9
|
import java.util.concurrent.TimeUnit;
|
10
|
import javax.xml.namespace.QName;
|
11
|
import javax.xml.parsers.DocumentBuilder;
|
12
|
import javax.xml.parsers.DocumentBuilderFactory;
|
13
|
import javax.xml.parsers.ParserConfigurationException;
|
14
|
import javax.xml.stream.*;
|
15
|
import javax.xml.stream.events.EndElement;
|
16
|
import javax.xml.stream.events.StartElement;
|
17
|
import javax.xml.stream.events.XMLEvent;
|
18
|
import javax.xml.transform.*;
|
19
|
import javax.xml.transform.dom.DOMSource;
|
20
|
import javax.xml.transform.stream.StreamResult;
|
21
|
import javax.xml.transform.stream.StreamSource;
|
22
|
|
23
|
import com.mongodb.BasicDBObject;
|
24
|
import com.mongodb.DBObject;
|
25
|
import com.mongodb.client.MongoCollection;
|
26
|
import com.mongodb.client.MongoDatabase;
|
27
|
import com.mongodb.client.model.Filters;
|
28
|
import com.mongodb.client.model.UpdateOptions;
|
29
|
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
|
30
|
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
|
31
|
import eu.dnetlib.rmi.data.MDStoreServiceException;
|
32
|
import org.antlr.stringtemplate.StringTemplate;
|
33
|
import org.apache.commons.lang3.StringEscapeUtils;
|
34
|
import org.apache.commons.logging.Log;
|
35
|
import org.apache.commons.logging.LogFactory;
|
36
|
import org.bson.conversions.Bson;
|
37
|
import org.springframework.beans.factory.annotation.Autowired;
|
38
|
import org.springframework.beans.factory.annotation.Required;
|
39
|
import org.springframework.core.io.Resource;
|
40
|
import org.w3c.dom.Document;
|
41
|
import org.w3c.dom.Element;
|
42
|
import org.w3c.dom.Node;
|
43
|
|
44
|
/**
|
45
|
* handle backlinks
|
46
|
* <p/>
|
47
|
* AvCreation Collection Corporate NonAVCreation Person
|
48
|
*
|
49
|
* @author marko
|
50
|
*/
|
51
|
public class MongoBuildBacklinks {
|
52
|
|
53
|
private static final Log log = LogFactory.getLog(MongoBuildBacklinks.class); // NOPMD by marko on 11/24/08 5:02 PM
|
54
|
private final int queueSize = 80;
|
55
|
private final int cpus = Runtime.getRuntime().availableProcessors();
|
56
|
private final ThreadLocal<XMLInputFactory> factory = new ThreadLocal<XMLInputFactory>() {
|
57
|
|
58
|
@Override
|
59
|
protected XMLInputFactory initialValue() {
|
60
|
return XMLInputFactory.newInstance();
|
61
|
}
|
62
|
};
|
63
|
private final ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
|
64
|
|
65
|
@Override
|
66
|
protected XMLOutputFactory initialValue() {
|
67
|
return XMLOutputFactory.newInstance();
|
68
|
}
|
69
|
};
|
70
|
private final ThreadLocal<Transformer> serializer = new ThreadLocal<Transformer>() {
|
71
|
|
72
|
@Override
|
73
|
protected Transformer initialValue() {
|
74
|
final TransformerFactory factory = TransformerFactory.newInstance();
|
75
|
try {
|
76
|
final Transformer trans = factory.newTransformer();
|
77
|
trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
|
78
|
return trans;
|
79
|
} catch (final TransformerConfigurationException e) {
|
80
|
throw new IllegalStateException(e);
|
81
|
}
|
82
|
}
|
83
|
};
|
84
|
private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>() {
|
85
|
|
86
|
@Override
|
87
|
protected DocumentBuilder initialValue() {
|
88
|
final DocumentBuilderFactory dbfac = DocumentBuilderFactory.newInstance();
|
89
|
try {
|
90
|
return dbfac.newDocumentBuilder();
|
91
|
} catch (final ParserConfigurationException e) {
|
92
|
throw new IllegalStateException(e);
|
93
|
}
|
94
|
}
|
95
|
};
|
96
|
public UpdateOperation sentinel = new UpdateOperation(null, null);
|
97
|
@Autowired
|
98
|
private MDStoreTransactionManagerImpl transactionManager;
|
99
|
private BacklinkTypeMatcher backlinkTypeMatcher;
|
100
|
private int MAX_NUMBER_OF_RELS = 1000;
|
101
|
private List<String> relationTypes = new ArrayList<>();
|
102
|
private Resource fixLinksXslt;
|
103
|
private StringTemplate backlinkTemplate;
|
104
|
private Set<String> entityTypes;
|
105
|
private List<String> titleTypes;
|
106
|
|
107
|
public void process(final String mdId, final DoneCallback doneCallback) throws MDStoreServiceException {
|
108
|
try {
|
109
|
log.debug("generating backlink for mdstore ID:" + mdId);
|
110
|
|
111
|
ThreadPoolExecutor executor = createExecutor();
|
112
|
|
113
|
final String internalId = transactionManager.readMdStore(mdId);
|
114
|
|
115
|
final MongoDatabase db = transactionManager.getDb();
|
116
|
|
117
|
final MongoCollection<DBObject> backlinks = db.getCollection(mdId + "backlinks", DBObject.class);
|
118
|
|
119
|
backlinks.deleteMany(new BasicDBObject());
|
120
|
|
121
|
final BlockingQueue<UpdateOperation> emitQueue = new ArrayBlockingQueue<>(500);
|
122
|
|
123
|
final MongoCollection<DBObject> data = db.getCollection(internalId, DBObject.class);
|
124
|
|
125
|
final long size = data.count();
|
126
|
double currentPosition = 0;
|
127
|
|
128
|
backlinks.drop();
|
129
|
backlinks.createIndex(new BasicDBObject("id", 1));
|
130
|
data.createIndex(new BasicDBObject("id", 1));
|
131
|
data.createIndex(new BasicDBObject("originalId", 1));
|
132
|
|
133
|
log.info("start background updating thread");
|
134
|
final Thread backgroundUpdating = startBackgroundUpdating(emitQueue, backlinks);
|
135
|
|
136
|
log.info("start parsing links");
|
137
|
currentPosition = parseLinks(currentPosition, executor, data, emitQueue);
|
138
|
|
139
|
executor.shutdown();
|
140
|
executor.awaitTermination(10, TimeUnit.MINUTES);
|
141
|
log.info("finish parallel part: grouping " + executor.isShutdown());
|
142
|
|
143
|
emitQueue.put(sentinel);
|
144
|
backgroundUpdating.join();
|
145
|
|
146
|
log.info("finish background backlinks storing");
|
147
|
|
148
|
executor = createExecutor();
|
149
|
log.info("new executor " + executor.isShutdown());
|
150
|
|
151
|
fixLinks(data, size, backlinks, executor, currentPosition);
|
152
|
|
153
|
log.info("waiting for other executor");
|
154
|
executor.shutdown();
|
155
|
executor.awaitTermination(10, TimeUnit.MINUTES);
|
156
|
log.info("finish parallel part: fixing");
|
157
|
|
158
|
backlinks.createIndex(new BasicDBObject("id", 1));
|
159
|
data.createIndex(new BasicDBObject("id", 1));
|
160
|
data.createIndex(new BasicDBObject("originalId", 1));
|
161
|
Map<String, String> paramsOut = new HashMap<>();
|
162
|
doneCallback.call(paramsOut);
|
163
|
|
164
|
} catch (final InterruptedException e) {
|
165
|
throw new IllegalStateException(e);
|
166
|
}
|
167
|
}
|
168
|
|
169
|
private void fixLinks(final MongoCollection<DBObject> data,
|
170
|
final long size,
|
171
|
final MongoCollection<DBObject> backlinks,
|
172
|
final ThreadPoolExecutor executor,
|
173
|
double currentPosition
|
174
|
) {
|
175
|
final double step = (1.0 * size) / backlinks.count();
|
176
|
|
177
|
log.info("FIXLINKS");
|
178
|
|
179
|
for (final DBObject link : backlinks.find()) {
|
180
|
|
181
|
log.debug("FIXING LINK " + link);
|
182
|
try {
|
183
|
executor.submit((Runnable) () -> {
|
184
|
try {
|
185
|
@SuppressWarnings("unchecked")
|
186
|
List<DBObject> links = (List<DBObject>) link.get("links");
|
187
|
fixLinks(data, (String) link.get("id"), links);
|
188
|
} catch (final Throwable e) {
|
189
|
log.warn("problems fixing", e);
|
190
|
}
|
191
|
});
|
192
|
currentPosition += step;
|
193
|
} catch (final Throwable e) {
|
194
|
log.warn("problems fixing", e);
|
195
|
}
|
196
|
}
|
197
|
|
198
|
log.info("FINISH FIXLINKS");
|
199
|
}
|
200
|
|
201
|
private double parseLinks(double currentPosition,
|
202
|
final ThreadPoolExecutor executor,
|
203
|
final MongoCollection<DBObject> data,
|
204
|
final BlockingQueue<UpdateOperation> emitQueue) {
|
205
|
// log.info("PARSING LINKS " + collection);
|
206
|
for (final DBObject current : data.find()) {
|
207
|
try {
|
208
|
// log.info("PARSING LINK " + current.get("originalId"));
|
209
|
executor.submit((Runnable) () -> parseLinks((String) current.get("body"), emitQueue));
|
210
|
currentPosition++;
|
211
|
} catch (final Throwable e) {
|
212
|
log.warn("problems parsing", e);
|
213
|
}
|
214
|
}
|
215
|
return currentPosition;
|
216
|
}
|
217
|
|
218
|
private Thread startBackgroundUpdating(final BlockingQueue<UpdateOperation> queue, final MongoCollection<DBObject> coll) {
|
219
|
final Thread background = new Thread(() -> {
|
220
|
while (true) {
|
221
|
try {
|
222
|
final UpdateOperation record = queue.take();
|
223
|
if (record == sentinel) {
|
224
|
break;
|
225
|
}
|
226
|
|
227
|
final UpdateOptions op = new UpdateOptions();
|
228
|
op.upsert(true);
|
229
|
coll.updateOne((Bson) record.find, (Bson) record.update, op);
|
230
|
} catch (final InterruptedException e) {
|
231
|
log.fatal("got exception in background thread", e);
|
232
|
throw new IllegalStateException(e);
|
233
|
}
|
234
|
}
|
235
|
});
|
236
|
background.start();
|
237
|
return background;
|
238
|
}
|
239
|
|
240
|
protected ThreadPoolExecutor createExecutor() {
|
241
|
return new ThreadPoolExecutor(cpus, cpus, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true),
|
242
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
243
|
}
|
244
|
|
245
|
private void fixLinks(final MongoCollection<DBObject> collection, final String id, final List<DBObject> links) {
|
246
|
|
247
|
final Bson query = Filters.eq("originalId", id);
|
248
|
final BasicDBObject source = (BasicDBObject) collection.find(query).first();
|
249
|
|
250
|
if (source == null) {
|
251
|
log.warn("object with originalId " + id + " doesn't exist or doesn't belong to this collection");
|
252
|
return;
|
253
|
}
|
254
|
|
255
|
final String origBody = (String) source.get("body");
|
256
|
log.info("UPDATING LINKS " + id);
|
257
|
final String updated = updateLinks(id, origBody, links);
|
258
|
// for (DBObject link : links) {
|
259
|
// log.info(" link from " + id + " to: " + link.get("target"));
|
260
|
// }
|
261
|
|
262
|
if (!origBody.equalsIgnoreCase(updated)) {
|
263
|
|
264
|
collection.updateOne(query, new BasicDBObject("$set", new BasicDBObject("body", updated)));
|
265
|
}
|
266
|
}
|
267
|
|
268
|
public boolean isEfgEntity(final StartElement element) {
|
269
|
return isEfgEntity(element.getName());
|
270
|
}
|
271
|
|
272
|
public boolean isEfgEntity(final EndElement element) {
|
273
|
return isEfgEntity(element.getName());
|
274
|
}
|
275
|
|
276
|
public boolean isEfgEntity(final QName name) {
|
277
|
return entityTypes.contains(name.getLocalPart());
|
278
|
}
|
279
|
|
280
|
public boolean isRelation(final XMLEvent event) {
|
281
|
if (event.isStartElement()) return isRelation(event.asStartElement());
|
282
|
return false;
|
283
|
}
|
284
|
|
285
|
public boolean isEndRelation(final XMLEvent event) {
|
286
|
if (event.isEndElement()) return isRelation(event.asEndElement());
|
287
|
return false;
|
288
|
}
|
289
|
|
290
|
public boolean isRelation(final StartElement element) {
|
291
|
return isRelation(element.getName());
|
292
|
}
|
293
|
|
294
|
public boolean isRelation(final EndElement element) {
|
295
|
return isRelation(element.getName());
|
296
|
}
|
297
|
|
298
|
public boolean isRelation(final QName name) {
|
299
|
return name.getLocalPart().startsWith("rel");
|
300
|
}
|
301
|
|
302
|
public XMLEventReader fragment(final String source) throws XMLStreamException {
|
303
|
return fragment(factory.get().createXMLEventReader(new StringReader(source)));
|
304
|
}
|
305
|
|
306
|
public XMLEventReader fragment(final XMLEventReader reader) throws XMLStreamException {
|
307
|
return factory.get().createFilteredReader(reader, new EventFilter() {
|
308
|
|
309
|
@Override
|
310
|
public boolean accept(final XMLEvent evt) {
|
311
|
return !(evt.isStartDocument() || evt.isEndDocument());
|
312
|
}
|
313
|
});
|
314
|
}
|
315
|
|
316
|
public String linkToFragment(final DBObject link) {
|
317
|
StringTemplate template = new StringTemplate(backlinkTemplate.getTemplate());
|
318
|
|
319
|
template.setAttribute("relType", prepareValueForTemplate(link.get("type")));
|
320
|
template.setAttribute("targetId", prepareValueForTemplate(link.get("target")));
|
321
|
template.setAttribute("title", prepareValueForTemplate(link.get("title")));
|
322
|
template.setAttribute("name", prepareValueForTemplate(link.get("name")));
|
323
|
template.setAttribute("type", prepareValueForTemplate(backlinkTypeMatcher.getInverseType(link.get("elementType").toString())));
|
324
|
template.setAttribute("itemTypes", prepareValueForTemplate(link.get("itemTypes")));
|
325
|
|
326
|
String res = template.toString();
|
327
|
|
328
|
if (log.isDebugEnabled()) {
|
329
|
log.debug("UPDATING adding link: " + res.replaceAll("\n", ""));
|
330
|
}
|
331
|
|
332
|
return res;
|
333
|
}
|
334
|
|
335
|
private Object prepareValueForTemplate(final Object obj) {
|
336
|
if ((obj != null) && (obj instanceof String)) return StringEscapeUtils.escapeXml11(obj.toString());
|
337
|
return obj;
|
338
|
}
|
339
|
|
340
|
private String maybeIdentifier(final XMLEventReader reader, final XMLEventWriter writer, final XMLEvent event) throws XMLStreamException {
|
341
|
if (event.isStartElement() && event.asStartElement().getName().getLocalPart().equalsIgnoreCase("identifier")) {
|
342
|
XMLEvent next = reader.peek();
|
343
|
|
344
|
if (next.isCharacters()) return next.asCharacters().getData().trim();
|
345
|
}
|
346
|
return null;
|
347
|
}
|
348
|
|
349
|
public DBObject findLink(final String currentRelId, final List<DBObject> links) {
|
350
|
for (DBObject link : links)
|
351
|
if (link.get("target").equals(currentRelId)) return link;
|
352
|
return null;
|
353
|
}
|
354
|
|
355
|
@SuppressWarnings("unchecked")
|
356
|
public String updateLinks(final String id, final String body, final List<DBObject> links) {
|
357
|
log.info("UPDATING links " + id);
|
358
|
|
359
|
try {
|
360
|
StringWriter res = new StringWriter();
|
361
|
|
362
|
XMLEventReader reader = factory.get().createXMLEventReader(new StringReader(body));
|
363
|
XMLEventWriter writer = outputFactory.get().createXMLEventWriter(res);
|
364
|
|
365
|
Set<String> present = new HashSet<>();
|
366
|
|
367
|
boolean inRelation = false;
|
368
|
boolean hasItemType = false;
|
369
|
String currentRelId = null;
|
370
|
|
371
|
XMLEvent event = null;
|
372
|
while (!(event = reader.nextEvent()).isEndDocument()) {
|
373
|
|
374
|
if (isRelation(event)) {
|
375
|
inRelation = true;
|
376
|
currentRelId = null;
|
377
|
} else if (isEndRelation(event)) {
|
378
|
inRelation = false;
|
379
|
|
380
|
if (!hasItemType) {
|
381
|
DBObject obj = findLink(currentRelId, links);
|
382
|
if (obj != null) {
|
383
|
final Object itemTypes = obj.get("itemTypes");
|
384
|
if (itemTypes != null) {
|
385
|
for (String itemType : (List<String>) itemTypes) {
|
386
|
writer.add(fragment("<efg:itemType xmlns:efg=\"http://www.europeanfilmgateway.eu/efg\" generated=\"true\">" + itemType
|
387
|
+ "</efg:itemType>"));
|
388
|
}
|
389
|
}
|
390
|
}
|
391
|
}
|
392
|
}
|
393
|
|
394
|
if (inRelation) {
|
395
|
// if current element is the identifier, mark the relation as seen.
|
396
|
String relId = maybeIdentifier(reader, writer, event);
|
397
|
if (relId != null) {
|
398
|
currentRelId = relId;
|
399
|
present.add(relId);
|
400
|
}
|
401
|
if (event.isStartElement() && event.asStartElement().getName().getLocalPart().equalsIgnoreCase("itemType")) {
|
402
|
hasItemType = true;
|
403
|
}
|
404
|
}
|
405
|
|
406
|
// flush new backlinks at the end of the record
|
407
|
if (event.isEndElement()) {
|
408
|
EndElement end = event.asEndElement();
|
409
|
if (isEfgEntity(end)) {
|
410
|
log.info("UPDATING end element is efgEntity");
|
411
|
|
412
|
for (int i = 0; i < Math.min(links.size(), MAX_NUMBER_OF_RELS); i++) {
|
413
|
DBObject link = links.get(i);
|
414
|
// generate only those (back)links which are not already present.
|
415
|
if (!present.contains(link.get("target"))) {
|
416
|
writer.add(fragment(linkToFragment(link)));
|
417
|
}
|
418
|
}
|
419
|
}
|
420
|
}
|
421
|
|
422
|
writer.add(event);
|
423
|
}
|
424
|
writer.close();
|
425
|
return res.toString();
|
426
|
} catch (Exception e) {
|
427
|
throw new IllegalStateException(e);
|
428
|
}
|
429
|
|
430
|
}
|
431
|
|
432
|
protected Document createLinksParams(final List<DBObject> links) throws ParserConfigurationException {
|
433
|
final Document doc = docBuilder.get().newDocument();
|
434
|
|
435
|
createLinksParams(links, doc);
|
436
|
|
437
|
return doc;
|
438
|
}
|
439
|
|
440
|
protected Document createLinksParams(final List<DBObject> links, final Document doc) throws ParserConfigurationException {
|
441
|
return createLinksParams(links, doc, doc);
|
442
|
}
|
443
|
|
444
|
protected Document createLinksParams(final List<DBObject> links, final Document doc, final Node container) throws ParserConfigurationException {
|
445
|
final Element root = doc.createElement("links");
|
446
|
|
447
|
container.appendChild(root);
|
448
|
|
449
|
for (final DBObject link : links) {
|
450
|
final Element node = doc.createElement("link");
|
451
|
node.setAttribute("target", (String) link.get("target"));
|
452
|
node.setAttribute("type", (String) link.get("type"));
|
453
|
|
454
|
Arrays.asList("name", "title").stream().filter(attr -> link.get(attr) != null).forEach(attr -> {
|
455
|
node.setAttribute(attr, (String) link.get(attr));
|
456
|
});
|
457
|
|
458
|
root.appendChild(node);
|
459
|
}
|
460
|
return doc;
|
461
|
}
|
462
|
|
463
|
protected String serialize(final Node doc) {
|
464
|
try {
|
465
|
// create string from xml tree
|
466
|
final StringWriter sw = new StringWriter();
|
467
|
final StreamResult result = new StreamResult(sw);
|
468
|
final DOMSource source = new DOMSource(doc);
|
469
|
|
470
|
serializer.get().transform(source, result);
|
471
|
return sw.toString();
|
472
|
|
473
|
} catch (final TransformerException e) {
|
474
|
log.warn("cannot serialize", e);
|
475
|
}
|
476
|
return "";
|
477
|
}
|
478
|
|
479
|
public boolean moreSpecific(final String next, final String old, final List<String> categories) {
|
480
|
if (old == null) return true;
|
481
|
if (next == null) return false;
|
482
|
return categories.indexOf(next.toLowerCase()) > categories.indexOf(old.toLowerCase());
|
483
|
}
|
484
|
|
485
|
private void parseLinks(final String record, final BlockingQueue<UpdateOperation> emitQueue) {
|
486
|
// log.info("record: " + record);
|
487
|
String recordId = null;
|
488
|
String recordType = null;
|
489
|
String relType = null;
|
490
|
|
491
|
final List<ToEmit> toEmit = new ArrayList<>();
|
492
|
final Map<String, String> params = new HashMap<>();
|
493
|
|
494
|
// for persons:
|
495
|
String firstName = null;
|
496
|
String lastName = "";
|
497
|
|
498
|
// for creations
|
499
|
String title = null;
|
500
|
String titleType = null;
|
501
|
|
502
|
Stack<String> elementStack = new Stack<String>();
|
503
|
elementStack.push("/");
|
504
|
|
505
|
try {
|
506
|
final XMLStreamReader parser = factory.get().createXMLStreamReader(new StreamSource(new StringReader(record)));
|
507
|
while (parser.hasNext()) {
|
508
|
int event = parser.next();
|
509
|
|
510
|
if (event == XMLStreamConstants.END_ELEMENT) {
|
511
|
pop(elementStack);
|
512
|
} else if (event == XMLStreamConstants.START_ELEMENT) {
|
513
|
final String localName = parser.getLocalName();
|
514
|
push(elementStack, localName);
|
515
|
|
516
|
if ("efgEntity".equalsIgnoreCase(localName)) {
|
517
|
while (parser.hasNext()) {
|
518
|
event = parser.next();
|
519
|
|
520
|
if (event == XMLStreamConstants.START_ELEMENT) {
|
521
|
recordType = parser.getLocalName();
|
522
|
relType = normalizeRecordType(recordType);
|
523
|
|
524
|
push(elementStack, recordType);
|
525
|
break;
|
526
|
}
|
527
|
}
|
528
|
}
|
529
|
|
530
|
if ("identifier".equalsIgnoreCase(localName) && "efgEntity".equalsIgnoreCase(grandParent(elementStack))) {
|
531
|
parser.next();
|
532
|
recordId = parser.getText();
|
533
|
} else if ("identifier".equalsIgnoreCase(localName)) {
|
534
|
parser.next();
|
535
|
// log.info("CCCCC: is an id but not the right depth: " + parser.getText());
|
536
|
// log.info("stack " + elementStack);
|
537
|
} else if ("person".equalsIgnoreCase(recordType)) {
|
538
|
if ("name".equalsIgnoreCase(localName)) {
|
539
|
final String part = parser.getAttributeValue(null, "part");
|
540
|
parser.next();
|
541
|
String value = parser.getText();
|
542
|
if (value != null) {
|
543
|
value = value.trim();
|
544
|
}
|
545
|
|
546
|
if ("forename".equalsIgnoreCase(part) || "Forename(s)".equalsIgnoreCase(part)) {
|
547
|
firstName = value;
|
548
|
} else if ("surname".equalsIgnoreCase(part) || "Family Name".equalsIgnoreCase(part)) {
|
549
|
lastName = value;
|
550
|
} else if ("stem".equalsIgnoreCase(part)) {
|
551
|
lastName = value;
|
552
|
}
|
553
|
}
|
554
|
} else if (isCreation(recordType)) {
|
555
|
if ("title".equalsIgnoreCase(localName)) {
|
556
|
String tmpTitle = null;
|
557
|
String tmpType = null;
|
558
|
|
559
|
while (parser.hasNext()) {
|
560
|
event = parser.next();
|
561
|
|
562
|
if (event == XMLStreamConstants.START_ELEMENT) {
|
563
|
push(elementStack, parser.getLocalName());
|
564
|
|
565
|
if ("text".equalsIgnoreCase(parser.getLocalName())) {
|
566
|
event = parser.next();
|
567
|
if (event == XMLStreamConstants.CHARACTERS) {
|
568
|
tmpTitle = parser.getText();
|
569
|
}
|
570
|
} else if ("relation".equalsIgnoreCase(parser.getLocalName())) {
|
571
|
event = parser.next();
|
572
|
if (event == XMLStreamConstants.CHARACTERS) {
|
573
|
tmpType = parser.getText();
|
574
|
}
|
575
|
}
|
576
|
|
577
|
} else if (event == XMLStreamConstants.END_ELEMENT) {
|
578
|
pop(elementStack);
|
579
|
|
580
|
if ("title".equalsIgnoreCase(parser.getLocalName())) {
|
581
|
tmpType = (tmpType == null) ? "n/a" : tmpType.trim();
|
582
|
tmpTitle = (tmpTitle == null) ? "n/a" : tmpTitle.trim();
|
583
|
|
584
|
if (moreSpecific(tmpType, titleType, titleTypes)) {
|
585
|
title = tmpTitle;
|
586
|
titleType = tmpType;
|
587
|
}
|
588
|
|
589
|
break;
|
590
|
}
|
591
|
}
|
592
|
}
|
593
|
}
|
594
|
}
|
595
|
|
596
|
if (relationTypes.contains(localName)) {
|
597
|
ToEmit nextEmission = null;
|
598
|
|
599
|
while (parser.hasNext()) {
|
600
|
event = parser.next();
|
601
|
|
602
|
if (event == XMLStreamConstants.START_ELEMENT) {
|
603
|
push(elementStack, parser.getLocalName());
|
604
|
|
605
|
if ("identifier".equalsIgnoreCase(parser.getLocalName())) {
|
606
|
parser.next();
|
607
|
|
608
|
// toEmit.add(new ToEmit(parser.getText().trim(), "n/a"));
|
609
|
nextEmission = new ToEmit(parser.getText().trim(), "n/a");
|
610
|
// break;
|
611
|
} else if ("type".equalsIgnoreCase(parser.getLocalName())) {
|
612
|
parser.next();
|
613
|
|
614
|
nextEmission.setElementType(parser.getText().trim());
|
615
|
toEmit.add(nextEmission);
|
616
|
break;
|
617
|
}
|
618
|
} else if (event == XMLStreamConstants.END_ELEMENT) {
|
619
|
pop(elementStack);
|
620
|
}
|
621
|
}
|
622
|
}
|
623
|
}
|
624
|
}
|
625
|
|
626
|
if ("person".equalsIgnoreCase(recordType)) {
|
627
|
String personName = lastName;
|
628
|
if (firstName != null) {
|
629
|
personName = firstName + " " + lastName;
|
630
|
}
|
631
|
|
632
|
params.put("name", personName);
|
633
|
} else if (isCreation(recordType) && (title != null)) {
|
634
|
params.put("title", title);
|
635
|
}
|
636
|
|
637
|
if (toEmit.isEmpty() && log.isDebugEnabled()) {
|
638
|
log.debug("emit queue is empty for record " + recordId.trim());
|
639
|
}
|
640
|
|
641
|
for (final ToEmit emitting : toEmit) {
|
642
|
emitBacklink(emitQueue, emitting.targetId, recordId.trim(), relType, emitting.elementType, params);
|
643
|
}
|
644
|
|
645
|
} catch (final XMLStreamException e) {
|
646
|
log.info("some parsing exception, moving along");
|
647
|
}
|
648
|
|
649
|
}
|
650
|
|
651
|
private String grandParent(final Stack<String> elementStack) {
|
652
|
if (elementStack.size() <= 3) return "";
|
653
|
String res = elementStack.get(elementStack.size() - 3);
|
654
|
return res;
|
655
|
}
|
656
|
|
657
|
private void pop(final Stack<String> elementStack) {
|
658
|
elementStack.pop();
|
659
|
}
|
660
|
|
661
|
private void push(final Stack<String> elementStack, final String localName) {
|
662
|
elementStack.push(localName);
|
663
|
}
|
664
|
|
665
|
private boolean isCreation(final String recordType) {
|
666
|
return "avcreation".equalsIgnoreCase(recordType) || "nonavcreation".equalsIgnoreCase(recordType);
|
667
|
}
|
668
|
|
669
|
private String normalizeRecordType(final String localName) {
|
670
|
final String relName = "rel" + localName;
|
671
|
|
672
|
for (final String type : relationTypes) {
|
673
|
if (relName.equalsIgnoreCase(type)) return type;
|
674
|
}
|
675
|
return relName;
|
676
|
}
|
677
|
|
678
|
private void emitBacklink(final BlockingQueue<UpdateOperation> emitQueue,
|
679
|
final String source,
|
680
|
final String target,
|
681
|
final String targetType,
|
682
|
final String elementType,
|
683
|
final Map<String, String> params) {
|
684
|
|
685
|
final DBObject pair = new BasicDBObject();
|
686
|
pair.put("target", target);
|
687
|
pair.put("type", targetType);
|
688
|
pair.put("elementType", elementType);
|
689
|
pair.putAll(params);
|
690
|
|
691
|
final DBObject find = new BasicDBObject("id", source);
|
692
|
|
693
|
final DBObject update = new BasicDBObject("$push", new BasicDBObject("links", pair));
|
694
|
|
695
|
try {
|
696
|
emitQueue.put(new UpdateOperation(find, update));
|
697
|
} catch (final InterruptedException e) {
|
698
|
throw new IllegalStateException(e);
|
699
|
}
|
700
|
}
|
701
|
|
702
|
public List<String> getRelationTypes() {
|
703
|
return relationTypes;
|
704
|
}
|
705
|
|
706
|
public void setRelationTypes(final List<String> relationTypes) {
|
707
|
this.relationTypes = relationTypes;
|
708
|
}
|
709
|
|
710
|
public Resource getFixLinksXslt() {
|
711
|
return fixLinksXslt;
|
712
|
}
|
713
|
|
714
|
@Required
|
715
|
public void setFixLinksXslt(final Resource fixLinksXslt) {
|
716
|
this.fixLinksXslt = fixLinksXslt;
|
717
|
}
|
718
|
|
719
|
public StringTemplate getBacklinkTemplate() {
|
720
|
return backlinkTemplate;
|
721
|
}
|
722
|
|
723
|
@Required
|
724
|
public void setBacklinkTemplate(final StringTemplate backlinkTemplate) {
|
725
|
this.backlinkTemplate = backlinkTemplate;
|
726
|
}
|
727
|
|
728
|
public List<String> getTitleTypes() {
|
729
|
return titleTypes;
|
730
|
}
|
731
|
|
732
|
public void setTitleTypes(final List<String> titleTypes) {
|
733
|
this.titleTypes = titleTypes;
|
734
|
}
|
735
|
|
736
|
public Set<String> getEntityTypes() {
|
737
|
return entityTypes;
|
738
|
}
|
739
|
|
740
|
public void setEntityTypes(final Set<String> entityTypes) {
|
741
|
this.entityTypes = entityTypes;
|
742
|
}
|
743
|
|
744
|
public BacklinkTypeMatcher getBacklinkTypeMatcher() {
|
745
|
return backlinkTypeMatcher;
|
746
|
}
|
747
|
|
748
|
@Required
|
749
|
public void setBacklinkTypeMatcher(final BacklinkTypeMatcher backlinkTypeMatcher) {
|
750
|
this.backlinkTypeMatcher = backlinkTypeMatcher;
|
751
|
}
|
752
|
|
753
|
public int getMAX_NUMBER_OF_RELS() {
|
754
|
return MAX_NUMBER_OF_RELS;
|
755
|
}
|
756
|
|
757
|
@Required
|
758
|
public void setMAX_NUMBER_OF_RELS(final int mAXNUMBEROFRELS) {
|
759
|
MAX_NUMBER_OF_RELS = mAXNUMBEROFRELS;
|
760
|
}
|
761
|
|
762
|
public static class UpdateOperation {
|
763
|
|
764
|
private DBObject find;
|
765
|
private DBObject update;
|
766
|
|
767
|
public UpdateOperation(final DBObject find, final DBObject update) {
|
768
|
super();
|
769
|
this.find = find;
|
770
|
this.update = update;
|
771
|
}
|
772
|
|
773
|
public DBObject getFind() {
|
774
|
return find;
|
775
|
}
|
776
|
|
777
|
public void setFind(final DBObject find) {
|
778
|
this.find = find;
|
779
|
}
|
780
|
|
781
|
public DBObject getUpdate() {
|
782
|
return update;
|
783
|
}
|
784
|
|
785
|
public void setUpdate(final DBObject update) {
|
786
|
this.update = update;
|
787
|
}
|
788
|
}
|
789
|
|
790
|
static class ToEmit {
|
791
|
|
792
|
private String targetId;
|
793
|
private String elementType;
|
794
|
|
795
|
public ToEmit(final String targetId, final String elementType) {
|
796
|
super();
|
797
|
this.targetId = targetId;
|
798
|
this.elementType = elementType;
|
799
|
}
|
800
|
|
801
|
public String getTargetId() {
|
802
|
return targetId;
|
803
|
}
|
804
|
|
805
|
public void setTargetId(final String targetId) {
|
806
|
this.targetId = targetId;
|
807
|
}
|
808
|
|
809
|
public String getElementType() {
|
810
|
return elementType;
|
811
|
}
|
812
|
|
813
|
public void setElementType(final String elementType) {
|
814
|
this.elementType = elementType;
|
815
|
}
|
816
|
}
|
817
|
|
818
|
}
|