Project

General

Profile

1
package eu.dnetlib.efg.backlinks;
2

    
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.*;
6
import java.util.concurrent.ArrayBlockingQueue;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.ThreadPoolExecutor;
9
import java.util.concurrent.TimeUnit;
10
import javax.xml.namespace.QName;
11
import javax.xml.parsers.DocumentBuilder;
12
import javax.xml.parsers.DocumentBuilderFactory;
13
import javax.xml.parsers.ParserConfigurationException;
14
import javax.xml.stream.*;
15
import javax.xml.stream.events.EndElement;
16
import javax.xml.stream.events.StartElement;
17
import javax.xml.stream.events.XMLEvent;
18
import javax.xml.transform.*;
19
import javax.xml.transform.dom.DOMSource;
20
import javax.xml.transform.stream.StreamResult;
21
import javax.xml.transform.stream.StreamSource;
22

    
23
import com.mongodb.BasicDBObject;
24
import com.mongodb.DBObject;
25
import com.mongodb.client.MongoCollection;
26
import com.mongodb.client.MongoDatabase;
27
import com.mongodb.client.model.Filters;
28
import com.mongodb.client.model.UpdateOptions;
29
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
30
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
31
import eu.dnetlib.rmi.data.MDStoreServiceException;
32
import org.antlr.stringtemplate.StringTemplate;
33
import org.apache.commons.lang3.StringEscapeUtils;
34
import org.apache.commons.logging.Log;
35
import org.apache.commons.logging.LogFactory;
36
import org.bson.conversions.Bson;
37
import org.springframework.beans.factory.annotation.Autowired;
38
import org.springframework.beans.factory.annotation.Required;
39
import org.springframework.core.io.Resource;
40
import org.w3c.dom.Document;
41
import org.w3c.dom.Element;
42
import org.w3c.dom.Node;
43

    
44
/**
45
 * handle backlinks
46
 * <p/>
47
 * AvCreation Collection Corporate NonAVCreation Person
48
 *
49
 * @author marko
50
 */
51
public class MongoBuildBacklinks {
52

    
53
	private static final Log log = LogFactory.getLog(MongoBuildBacklinks.class); // NOPMD by marko on 11/24/08 5:02 PM
54
	private final int queueSize = 80;
55
	private final int cpus = Runtime.getRuntime().availableProcessors();
56
	private final ThreadLocal<XMLInputFactory> factory = new ThreadLocal<XMLInputFactory>() {
57

    
58
		@Override
59
		protected XMLInputFactory initialValue() {
60
			return XMLInputFactory.newInstance();
61
		}
62
	};
63
	private final ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
64

    
65
		@Override
66
		protected XMLOutputFactory initialValue() {
67
			return XMLOutputFactory.newInstance();
68
		}
69
	};
70
	private final ThreadLocal<Transformer> serializer = new ThreadLocal<Transformer>() {
71

    
72
		@Override
73
		protected Transformer initialValue() {
74
			final TransformerFactory factory = TransformerFactory.newInstance();
75
			try {
76
				final Transformer trans = factory.newTransformer();
77
				trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
78
				return trans;
79
			} catch (final TransformerConfigurationException e) {
80
				throw new IllegalStateException(e);
81
			}
82
		}
83
	};
84
	private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>() {
85

    
86
		@Override
87
		protected DocumentBuilder initialValue() {
88
			final DocumentBuilderFactory dbfac = DocumentBuilderFactory.newInstance();
89
			try {
90
				return dbfac.newDocumentBuilder();
91
			} catch (final ParserConfigurationException e) {
92
				throw new IllegalStateException(e);
93
			}
94
		}
95
	};
96
	public UpdateOperation sentinel = new UpdateOperation(null, null);
97
	@Autowired
98
	private MDStoreTransactionManagerImpl transactionManager;
99
	private BacklinkTypeMatcher backlinkTypeMatcher;
100
	private int MAX_NUMBER_OF_RELS = 1000;
101
	private List<String> relationTypes = new ArrayList<>();
102
	private Resource fixLinksXslt;
103
	private StringTemplate backlinkTemplate;
104
	private Set<String> entityTypes;
105
	private List<String> titleTypes;
106

    
107
	public void process(final String mdId, final DoneCallback doneCallback) throws MDStoreServiceException {
108
		try {
109
			log.debug("generating backlink for mdstore ID:" + mdId);
110

    
111
			ThreadPoolExecutor executor = createExecutor();
112

    
113
			final String internalId = transactionManager.readMdStore(mdId);
114

    
115
			final MongoDatabase db = transactionManager.getDb();
116

    
117
			final MongoCollection<DBObject> backlinks = db.getCollection(mdId + "backlinks", DBObject.class);
118

    
119
			backlinks.deleteMany(new BasicDBObject());
120

    
121
			final BlockingQueue<UpdateOperation> emitQueue = new ArrayBlockingQueue<>(500);
122

    
123
			final MongoCollection<DBObject> data = db.getCollection(internalId, DBObject.class);
124

    
125
			final long size = data.count();
126
			double currentPosition = 0;
127

    
128
			backlinks.drop();
129
			backlinks.createIndex(new BasicDBObject("id", 1));
130
			data.createIndex(new BasicDBObject("id", 1));
131
			data.createIndex(new BasicDBObject("originalId", 1));
132

    
133
			log.info("start background updating thread");
134
			final Thread backgroundUpdating = startBackgroundUpdating(emitQueue, backlinks);
135

    
136
			log.info("start parsing links");
137
			currentPosition = parseLinks(currentPosition, executor, data, emitQueue);
138

    
139
			executor.shutdown();
140
			executor.awaitTermination(10, TimeUnit.MINUTES);
141
			log.info("finish parallel part: grouping " + executor.isShutdown());
142

    
143
			emitQueue.put(sentinel);
144
			backgroundUpdating.join();
145

    
146
			log.info("finish background backlinks storing");
147

    
148
			executor = createExecutor();
149
			log.info("new executor " + executor.isShutdown());
150

    
151
			fixLinks(data, size, backlinks, executor, currentPosition);
152

    
153
			log.info("waiting for other executor");
154
			executor.shutdown();
155
			executor.awaitTermination(10, TimeUnit.MINUTES);
156
			log.info("finish parallel part: fixing");
157

    
158
			backlinks.createIndex(new BasicDBObject("id", 1));
159
			data.createIndex(new BasicDBObject("id", 1));
160
			data.createIndex(new BasicDBObject("originalId", 1));
161
			Map<String, String> paramsOut = new HashMap<>();
162
			doneCallback.call(paramsOut);
163

    
164
		} catch (final InterruptedException e) {
165
			throw new IllegalStateException(e);
166
		}
167
	}
168

    
169
	private void fixLinks(final MongoCollection<DBObject> data,
170
			final long size,
171
			final MongoCollection<DBObject> backlinks,
172
			final ThreadPoolExecutor executor,
173
			double currentPosition
174
	) {
175
		final double step = (1.0 * size) / backlinks.count();
176

    
177
		log.info("FIXLINKS");
178

    
179
		for (final DBObject link : backlinks.find()) {
180

    
181
			log.debug("FIXING LINK " + link);
182
			try {
183
				executor.submit((Runnable) () -> {
184
					try {
185
						@SuppressWarnings("unchecked")
186
						List<DBObject> links = (List<DBObject>) link.get("links");
187
						fixLinks(data, (String) link.get("id"), links);
188
					} catch (final Throwable e) {
189
						log.warn("problems fixing", e);
190
					}
191
				});
192
				currentPosition += step;
193
			} catch (final Throwable e) {
194
				log.warn("problems fixing", e);
195
			}
196
		}
197

    
198
		log.info("FINISH FIXLINKS");
199
	}
200

    
201
	private double parseLinks(double currentPosition,
202
			final ThreadPoolExecutor executor,
203
			final MongoCollection<DBObject> data,
204
			final BlockingQueue<UpdateOperation> emitQueue) {
205
		// log.info("PARSING LINKS " + collection);
206
		for (final DBObject current : data.find()) {
207
			try {
208
				// log.info("PARSING LINK " + current.get("originalId"));
209
				executor.submit((Runnable) () -> parseLinks((String) current.get("body"), emitQueue));
210
				currentPosition++;
211
			} catch (final Throwable e) {
212
				log.warn("problems parsing", e);
213
			}
214
		}
215
		return currentPosition;
216
	}
217

    
218
	private Thread startBackgroundUpdating(final BlockingQueue<UpdateOperation> queue, final MongoCollection<DBObject> coll) {
219
		final Thread background = new Thread(() -> {
220
			while (true) {
221
				try {
222
					final UpdateOperation record = queue.take();
223
					if (record == sentinel) {
224
						break;
225
					}
226

    
227
					final UpdateOptions op = new UpdateOptions();
228
					op.upsert(true);
229
					coll.updateOne((Bson) record.find, (Bson) record.update, op);
230
				} catch (final InterruptedException e) {
231
					log.fatal("got exception in background thread", e);
232
					throw new IllegalStateException(e);
233
				}
234
			}
235
		});
236
		background.start();
237
		return background;
238
	}
239

    
240
	protected ThreadPoolExecutor createExecutor() {
241
		return new ThreadPoolExecutor(cpus, cpus, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true),
242
				new ThreadPoolExecutor.CallerRunsPolicy());
243
	}
244

    
245
	private void fixLinks(final MongoCollection<DBObject> collection, final String id, final List<DBObject> links) {
246

    
247
		final Bson query = Filters.eq("originalId", id);
248
		final BasicDBObject source = (BasicDBObject) collection.find(query).first();
249

    
250
		if (source == null) {
251
			log.warn("object with originalId " + id + " doesn't exist or doesn't belong to this collection");
252
			return;
253
		}
254

    
255
		final String origBody = (String) source.get("body");
256
		log.info("UPDATING LINKS " + id);
257
		final String updated = updateLinks(id, origBody, links);
258
		// for (DBObject link : links) {
259
		// log.info(" link from " + id + " to: " + link.get("target"));
260
		// }
261

    
262
		if (!origBody.equalsIgnoreCase(updated)) {
263

    
264
			collection.updateOne(query, new BasicDBObject("$set", new BasicDBObject("body", updated)));
265
		}
266
	}
267

    
268
	public boolean isEfgEntity(final StartElement element) {
269
		return isEfgEntity(element.getName());
270
	}
271

    
272
	public boolean isEfgEntity(final EndElement element) {
273
		return isEfgEntity(element.getName());
274
	}
275

    
276
	public boolean isEfgEntity(final QName name) {
277
		return entityTypes.contains(name.getLocalPart());
278
	}
279

    
280
	public boolean isRelation(final XMLEvent event) {
281
		if (event.isStartElement()) return isRelation(event.asStartElement());
282
		return false;
283
	}
284

    
285
	public boolean isEndRelation(final XMLEvent event) {
286
		if (event.isEndElement()) return isRelation(event.asEndElement());
287
		return false;
288
	}
289

    
290
	public boolean isRelation(final StartElement element) {
291
		return isRelation(element.getName());
292
	}
293

    
294
	public boolean isRelation(final EndElement element) {
295
		return isRelation(element.getName());
296
	}
297

    
298
	public boolean isRelation(final QName name) {
299
		return name.getLocalPart().startsWith("rel");
300
	}
301

    
302
	public XMLEventReader fragment(final String source) throws XMLStreamException {
303
		return fragment(factory.get().createXMLEventReader(new StringReader(source)));
304
	}
305

    
306
	public XMLEventReader fragment(final XMLEventReader reader) throws XMLStreamException {
307
		return factory.get().createFilteredReader(reader, new EventFilter() {
308

    
309
			@Override
310
			public boolean accept(final XMLEvent evt) {
311
				return !(evt.isStartDocument() || evt.isEndDocument());
312
			}
313
		});
314
	}
315

    
316
	public String linkToFragment(final DBObject link) {
317
		StringTemplate template = new StringTemplate(backlinkTemplate.getTemplate());
318

    
319
		template.setAttribute("relType", prepareValueForTemplate(link.get("type")));
320
		template.setAttribute("targetId", prepareValueForTemplate(link.get("target")));
321
		template.setAttribute("title", prepareValueForTemplate(link.get("title")));
322
		template.setAttribute("name", prepareValueForTemplate(link.get("name")));
323
		template.setAttribute("type", prepareValueForTemplate(backlinkTypeMatcher.getInverseType(link.get("elementType").toString())));
324
		template.setAttribute("itemTypes", prepareValueForTemplate(link.get("itemTypes")));
325

    
326
		String res = template.toString();
327

    
328
		if (log.isDebugEnabled()) {
329
			log.debug("UPDATING adding link: " + res.replaceAll("\n", ""));
330
		}
331

    
332
		return res;
333
	}
334

    
335
	private Object prepareValueForTemplate(final Object obj) {
336
		if ((obj != null) && (obj instanceof String)) return StringEscapeUtils.escapeXml11(obj.toString());
337
		return obj;
338
	}
339

    
340
	private String maybeIdentifier(final XMLEventReader reader, final XMLEventWriter writer, final XMLEvent event) throws XMLStreamException {
341
		if (event.isStartElement() && event.asStartElement().getName().getLocalPart().equalsIgnoreCase("identifier")) {
342
			XMLEvent next = reader.peek();
343

    
344
			if (next.isCharacters()) return next.asCharacters().getData().trim();
345
		}
346
		return null;
347
	}
348

    
349
	public DBObject findLink(final String currentRelId, final List<DBObject> links) {
350
		for (DBObject link : links)
351
			if (link.get("target").equals(currentRelId)) return link;
352
		return null;
353
	}
354

    
355
	@SuppressWarnings("unchecked")
356
	public String updateLinks(final String id, final String body, final List<DBObject> links) {
357
		log.info("UPDATING links " + id);
358

    
359
		try {
360
			StringWriter res = new StringWriter();
361

    
362
			XMLEventReader reader = factory.get().createXMLEventReader(new StringReader(body));
363
			XMLEventWriter writer = outputFactory.get().createXMLEventWriter(res);
364

    
365
			Set<String> present = new HashSet<>();
366

    
367
			boolean inRelation = false;
368
			boolean hasItemType = false;
369
			String currentRelId = null;
370

    
371
			XMLEvent event = null;
372
			while (!(event = reader.nextEvent()).isEndDocument()) {
373

    
374
				if (isRelation(event)) {
375
					inRelation = true;
376
					currentRelId = null;
377
				} else if (isEndRelation(event)) {
378
					inRelation = false;
379

    
380
					if (!hasItemType) {
381
						DBObject obj = findLink(currentRelId, links);
382
						if (obj != null) {
383
							final Object itemTypes = obj.get("itemTypes");
384
							if (itemTypes != null) {
385
								for (String itemType : (List<String>) itemTypes) {
386
									writer.add(fragment("<efg:itemType xmlns:efg=\"http://www.europeanfilmgateway.eu/efg\" generated=\"true\">" + itemType
387
											+ "</efg:itemType>"));
388
								}
389
							}
390
						}
391
					}
392
				}
393

    
394
				if (inRelation) {
395
					// if current element is the identifier, mark the relation as seen.
396
					String relId = maybeIdentifier(reader, writer, event);
397
					if (relId != null) {
398
						currentRelId = relId;
399
						present.add(relId);
400
					}
401
					if (event.isStartElement() && event.asStartElement().getName().getLocalPart().equalsIgnoreCase("itemType")) {
402
						hasItemType = true;
403
					}
404
				}
405

    
406
				// flush new backlinks at the end of the record
407
				if (event.isEndElement()) {
408
					EndElement end = event.asEndElement();
409
					if (isEfgEntity(end)) {
410
						log.info("UPDATING end element is efgEntity");
411

    
412
						for (int i = 0; i < Math.min(links.size(), MAX_NUMBER_OF_RELS); i++) {
413
							DBObject link = links.get(i);
414
							// generate only those (back)links which are not already present.
415
							if (!present.contains(link.get("target"))) {
416
								writer.add(fragment(linkToFragment(link)));
417
							}
418
						}
419
					}
420
				}
421

    
422
				writer.add(event);
423
			}
424
			writer.close();
425
			return res.toString();
426
		} catch (Exception e) {
427
			throw new IllegalStateException(e);
428
		}
429

    
430
	}
431

    
432
	protected Document createLinksParams(final List<DBObject> links) throws ParserConfigurationException {
433
		final Document doc = docBuilder.get().newDocument();
434

    
435
		createLinksParams(links, doc);
436

    
437
		return doc;
438
	}
439

    
440
	protected Document createLinksParams(final List<DBObject> links, final Document doc) throws ParserConfigurationException {
441
		return createLinksParams(links, doc, doc);
442
	}
443

    
444
	protected Document createLinksParams(final List<DBObject> links, final Document doc, final Node container) throws ParserConfigurationException {
445
		final Element root = doc.createElement("links");
446

    
447
		container.appendChild(root);
448

    
449
		for (final DBObject link : links) {
450
			final Element node = doc.createElement("link");
451
			node.setAttribute("target", (String) link.get("target"));
452
			node.setAttribute("type", (String) link.get("type"));
453

    
454
			Arrays.asList("name", "title").stream().filter(attr -> link.get(attr) != null).forEach(attr -> {
455
				node.setAttribute(attr, (String) link.get(attr));
456
			});
457

    
458
			root.appendChild(node);
459
		}
460
		return doc;
461
	}
462

    
463
	protected String serialize(final Node doc) {
464
		try {
465
			// create string from xml tree
466
			final StringWriter sw = new StringWriter();
467
			final StreamResult result = new StreamResult(sw);
468
			final DOMSource source = new DOMSource(doc);
469

    
470
			serializer.get().transform(source, result);
471
			return sw.toString();
472

    
473
		} catch (final TransformerException e) {
474
			log.warn("cannot serialize", e);
475
		}
476
		return "";
477
	}
478

    
479
	public boolean moreSpecific(final String next, final String old, final List<String> categories) {
480
		if (old == null) return true;
481
		if (next == null) return false;
482
		return categories.indexOf(next.toLowerCase()) > categories.indexOf(old.toLowerCase());
483
	}
484

    
485
	private void parseLinks(final String record, final BlockingQueue<UpdateOperation> emitQueue) {
486
		// log.info("record: " + record);
487
		String recordId = null;
488
		String recordType = null;
489
		String relType = null;
490

    
491
		final List<ToEmit> toEmit = new ArrayList<>();
492
		final Map<String, String> params = new HashMap<>();
493

    
494
		// for persons:
495
		String firstName = null;
496
		String lastName = "";
497

    
498
		// for creations
499
		String title = null;
500
		String titleType = null;
501

    
502
		Stack<String> elementStack = new Stack<String>();
503
		elementStack.push("/");
504

    
505
		try {
506
			final XMLStreamReader parser = factory.get().createXMLStreamReader(new StreamSource(new StringReader(record)));
507
			while (parser.hasNext()) {
508
				int event = parser.next();
509

    
510
				if (event == XMLStreamConstants.END_ELEMENT) {
511
					pop(elementStack);
512
				} else if (event == XMLStreamConstants.START_ELEMENT) {
513
					final String localName = parser.getLocalName();
514
					push(elementStack, localName);
515

    
516
					if ("efgEntity".equalsIgnoreCase(localName)) {
517
						while (parser.hasNext()) {
518
							event = parser.next();
519

    
520
							if (event == XMLStreamConstants.START_ELEMENT) {
521
								recordType = parser.getLocalName();
522
								relType = normalizeRecordType(recordType);
523

    
524
								push(elementStack, recordType);
525
								break;
526
							}
527
						}
528
					}
529

    
530
					if ("identifier".equalsIgnoreCase(localName) && "efgEntity".equalsIgnoreCase(grandParent(elementStack))) {
531
						parser.next();
532
						recordId = parser.getText();
533
					} else if ("identifier".equalsIgnoreCase(localName)) {
534
						parser.next();
535
						// log.info("CCCCC: is an id but not the right depth: " + parser.getText());
536
						// log.info("stack " + elementStack);
537
					} else if ("person".equalsIgnoreCase(recordType)) {
538
						if ("name".equalsIgnoreCase(localName)) {
539
							final String part = parser.getAttributeValue(null, "part");
540
							parser.next();
541
							String value = parser.getText();
542
							if (value != null) {
543
								value = value.trim();
544
							}
545

    
546
							if ("forename".equalsIgnoreCase(part) || "Forename(s)".equalsIgnoreCase(part)) {
547
								firstName = value;
548
							} else if ("surname".equalsIgnoreCase(part) || "Family Name".equalsIgnoreCase(part)) {
549
								lastName = value;
550
							} else if ("stem".equalsIgnoreCase(part)) {
551
								lastName = value;
552
							}
553
						}
554
					} else if (isCreation(recordType)) {
555
						if ("title".equalsIgnoreCase(localName)) {
556
							String tmpTitle = null;
557
							String tmpType = null;
558

    
559
							while (parser.hasNext()) {
560
								event = parser.next();
561

    
562
								if (event == XMLStreamConstants.START_ELEMENT) {
563
									push(elementStack, parser.getLocalName());
564

    
565
									if ("text".equalsIgnoreCase(parser.getLocalName())) {
566
										event = parser.next();
567
										if (event == XMLStreamConstants.CHARACTERS) {
568
											tmpTitle = parser.getText();
569
										}
570
									} else if ("relation".equalsIgnoreCase(parser.getLocalName())) {
571
										event = parser.next();
572
										if (event == XMLStreamConstants.CHARACTERS) {
573
											tmpType = parser.getText();
574
										}
575
									}
576

    
577
								} else if (event == XMLStreamConstants.END_ELEMENT) {
578
									pop(elementStack);
579

    
580
									if ("title".equalsIgnoreCase(parser.getLocalName())) {
581
										tmpType = (tmpType == null) ? "n/a" : tmpType.trim();
582
										tmpTitle = (tmpTitle == null) ? "n/a" : tmpTitle.trim();
583

    
584
										if (moreSpecific(tmpType, titleType, titleTypes)) {
585
											title = tmpTitle;
586
											titleType = tmpType;
587
										}
588

    
589
										break;
590
									}
591
								}
592
							}
593
						}
594
					}
595

    
596
					if (relationTypes.contains(localName)) {
597
						ToEmit nextEmission = null;
598

    
599
						while (parser.hasNext()) {
600
							event = parser.next();
601

    
602
							if (event == XMLStreamConstants.START_ELEMENT) {
603
								push(elementStack, parser.getLocalName());
604

    
605
								if ("identifier".equalsIgnoreCase(parser.getLocalName())) {
606
									parser.next();
607

    
608
									// toEmit.add(new ToEmit(parser.getText().trim(), "n/a"));
609
									nextEmission = new ToEmit(parser.getText().trim(), "n/a");
610
									// break;
611
								} else if ("type".equalsIgnoreCase(parser.getLocalName())) {
612
									parser.next();
613

    
614
									nextEmission.setElementType(parser.getText().trim());
615
									toEmit.add(nextEmission);
616
									break;
617
								}
618
							} else if (event == XMLStreamConstants.END_ELEMENT) {
619
								pop(elementStack);
620
							}
621
						}
622
					}
623
				}
624
			}
625

    
626
			if ("person".equalsIgnoreCase(recordType)) {
627
				String personName = lastName;
628
				if (firstName != null) {
629
					personName = firstName + " " + lastName;
630
				}
631

    
632
				params.put("name", personName);
633
			} else if (isCreation(recordType) && (title != null)) {
634
				params.put("title", title);
635
			}
636

    
637
			if (toEmit.isEmpty() && log.isDebugEnabled()) {
638
				log.debug("emit queue is empty for record " + recordId.trim());
639
			}
640

    
641
			for (final ToEmit emitting : toEmit) {
642
				emitBacklink(emitQueue, emitting.targetId, recordId.trim(), relType, emitting.elementType, params);
643
			}
644

    
645
		} catch (final XMLStreamException e) {
646
			log.info("some parsing exception, moving along");
647
		}
648

    
649
	}
650

    
651
	private String grandParent(final Stack<String> elementStack) {
652
		if (elementStack.size() <= 3) return "";
653
		String res = elementStack.get(elementStack.size() - 3);
654
		return res;
655
	}
656

    
657
	private void pop(final Stack<String> elementStack) {
658
		elementStack.pop();
659
	}
660

    
661
	private void push(final Stack<String> elementStack, final String localName) {
662
		elementStack.push(localName);
663
	}
664

    
665
	private boolean isCreation(final String recordType) {
666
		return "avcreation".equalsIgnoreCase(recordType) || "nonavcreation".equalsIgnoreCase(recordType);
667
	}
668

    
669
	private String normalizeRecordType(final String localName) {
670
		final String relName = "rel" + localName;
671

    
672
		for (final String type : relationTypes) {
673
			if (relName.equalsIgnoreCase(type)) return type;
674
		}
675
		return relName;
676
	}
677

    
678
	private void emitBacklink(final BlockingQueue<UpdateOperation> emitQueue,
679
			final String source,
680
			final String target,
681
			final String targetType,
682
			final String elementType,
683
			final Map<String, String> params) {
684

    
685
		final DBObject pair = new BasicDBObject();
686
		pair.put("target", target);
687
		pair.put("type", targetType);
688
		pair.put("elementType", elementType);
689
		pair.putAll(params);
690

    
691
		final DBObject find = new BasicDBObject("id", source);
692

    
693
		final DBObject update = new BasicDBObject("$push", new BasicDBObject("links", pair));
694

    
695
		try {
696
			emitQueue.put(new UpdateOperation(find, update));
697
		} catch (final InterruptedException e) {
698
			throw new IllegalStateException(e);
699
		}
700
	}
701

    
702
	public List<String> getRelationTypes() {
703
		return relationTypes;
704
	}
705

    
706
	public void setRelationTypes(final List<String> relationTypes) {
707
		this.relationTypes = relationTypes;
708
	}
709

    
710
	public Resource getFixLinksXslt() {
711
		return fixLinksXslt;
712
	}
713

    
714
	@Required
715
	public void setFixLinksXslt(final Resource fixLinksXslt) {
716
		this.fixLinksXslt = fixLinksXslt;
717
	}
718

    
719
	public StringTemplate getBacklinkTemplate() {
720
		return backlinkTemplate;
721
	}
722

    
723
	@Required
724
	public void setBacklinkTemplate(final StringTemplate backlinkTemplate) {
725
		this.backlinkTemplate = backlinkTemplate;
726
	}
727

    
728
	public List<String> getTitleTypes() {
729
		return titleTypes;
730
	}
731

    
732
	public void setTitleTypes(final List<String> titleTypes) {
733
		this.titleTypes = titleTypes;
734
	}
735

    
736
	public Set<String> getEntityTypes() {
737
		return entityTypes;
738
	}
739

    
740
	public void setEntityTypes(final Set<String> entityTypes) {
741
		this.entityTypes = entityTypes;
742
	}
743

    
744
	public BacklinkTypeMatcher getBacklinkTypeMatcher() {
745
		return backlinkTypeMatcher;
746
	}
747

    
748
	@Required
749
	public void setBacklinkTypeMatcher(final BacklinkTypeMatcher backlinkTypeMatcher) {
750
		this.backlinkTypeMatcher = backlinkTypeMatcher;
751
	}
752

    
753
	public int getMAX_NUMBER_OF_RELS() {
754
		return MAX_NUMBER_OF_RELS;
755
	}
756

    
757
	@Required
758
	public void setMAX_NUMBER_OF_RELS(final int mAXNUMBEROFRELS) {
759
		MAX_NUMBER_OF_RELS = mAXNUMBEROFRELS;
760
	}
761

    
762
	public static class UpdateOperation {
763

    
764
		private DBObject find;
765
		private DBObject update;
766

    
767
		public UpdateOperation(final DBObject find, final DBObject update) {
768
			super();
769
			this.find = find;
770
			this.update = update;
771
		}
772

    
773
		public DBObject getFind() {
774
			return find;
775
		}
776

    
777
		public void setFind(final DBObject find) {
778
			this.find = find;
779
		}
780

    
781
		public DBObject getUpdate() {
782
			return update;
783
		}
784

    
785
		public void setUpdate(final DBObject update) {
786
			this.update = update;
787
		}
788
	}
789

    
790
	static class ToEmit {
791

    
792
		private String targetId;
793
		private String elementType;
794

    
795
		public ToEmit(final String targetId, final String elementType) {
796
			super();
797
			this.targetId = targetId;
798
			this.elementType = elementType;
799
		}
800

    
801
		public String getTargetId() {
802
			return targetId;
803
		}
804

    
805
		public void setTargetId(final String targetId) {
806
			this.targetId = targetId;
807
		}
808

    
809
		public String getElementType() {
810
			return elementType;
811
		}
812

    
813
		public void setElementType(final String elementType) {
814
			this.elementType = elementType;
815
		}
816
	}
817

    
818
}
(3-3/4)