Project

General

Profile

« Previous | Next » 

Revision 42161

rename of packages and classes

View differences:

modules/dnet-efg/trunk/src/main/java/efg/workflows/nodes/ExecuteMDStorePluginJobNode.java
1
package efg.workflows.nodes;
2

  
3
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
4
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
5
import eu.dnetlib.msro.workflows.procs.Env;
6
import eu.dnetlib.msro.workflows.procs.Token;
7
import eu.dnetlib.rmi.data.MDStoreService;
8

  
9
/**
10
 * Created by sandro on 2/22/16.
11
 */
12
public class ExecuteMDStorePluginJobNode extends BlackboardJobNode {
13

  
14
	private String mdStoreId;
15

  
16
	private String pluginName;
17

  
18
	public String getPluginName() {
19
		return this.pluginName;
20
	}
21

  
22
	public void setPluginName(final String pluginName) {
23
		this.pluginName = pluginName;
24
	}
25

  
26
	public String getMdStoreId() {
27
		return this.mdStoreId;
28
	}
29

  
30
	public void setMdStoreId(final String mdStoreId) {
31
		this.mdStoreId = mdStoreId;
32
	}
33

  
34
	@Override
35
	protected String obtainServiceId(final Env env) {
36
		return getServiceLocator().getServiceId(MDStoreService.class, getMdStoreId());
37
	}
38

  
39
	@Override
40
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
41

  
42
		job.setAction("RUN_PLUGIN");
43
		job.getParameters().put("plugin.name", getPluginName());
44
		job.getParameters().put("mdStoreId", getMdStoreId());
45

  
46
	}
47
}
modules/dnet-efg/trunk/src/main/java/efg/backlink/BacklinkTypeMatcher.java
1
package efg.backlink;
2

  
3
/**
4
 * Created by sandro on 2/22/16.
5
 */
6
public interface BacklinkTypeMatcher {
7

  
8
	String getInverseType(String type);
9
}
10

  
11

  
modules/dnet-efg/trunk/src/main/java/efg/backlink/PropertyBacklinkTypeMatcherImpl.java
1
package efg.backlink;
2

  
3
import java.util.HashMap;
4
import java.util.Map;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
public class PropertyBacklinkTypeMatcherImpl implements BacklinkTypeMatcher {
10

  
11
	private static final Log log = LogFactory.getLog(PropertyBacklinkTypeMatcherImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
12

  
13
	private Map<String, String> mapRules;
14

  
15
	public PropertyBacklinkTypeMatcherImpl(String rules) {
16
		super();
17

  
18
		this.mapRules = new HashMap<String, String>();
19

  
20
		for (String i : rules.split(";")) {
21
			String[] arr = i.split("<->");
22
			if (arr.length == 2) {
23
				String arr0 = arr[0].trim();
24
				String arr1 = arr[1].trim();
25
				mapRules.put(arr0.toLowerCase(), arr1);
26
				mapRules.put(arr1.toLowerCase(), arr0);
27
			}
28
		}
29

  
30
		for (Map.Entry<String, String> e : mapRules.entrySet()) {
31
			log.info("MapRules key: " + e.getKey() + " - value: " + e.getValue());
32
		}
33
	}
34

  
35
	@Override
36
	public String getInverseType(String type) {
37
		if (type != null && mapRules.containsKey(type.toLowerCase().trim()))
38
			return mapRules.get(type.toLowerCase().trim());
39
		return type;
40
	}
41

  
42
}
modules/dnet-efg/trunk/src/main/java/efg/backlink/BackLinkPlugin.java
1
package efg.backlink;
2

  
3
import java.util.Map;
4

  
5
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
6
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
7
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
8
import eu.dnetlib.rmi.data.MDStoreServiceException;
9
import org.springframework.beans.factory.annotation.Autowired;
10

  
11
/**
12
 * Created by Sandro La Bruzzo on 12/2/15.
13
 */
14

  
15
public class BackLinkPlugin implements MDStorePlugin {
16

  
17
	@Autowired
18
	MongoBuildBacklinks mongoBuildBacklinks;
19

  
20
	@Override
21
	public void run(MDStoreDao dao, Map<String, String> params, DoneCallback doneCallback) throws MDStoreServiceException {
22

  
23
		final String id = params.get("mdStoreId");
24
		mongoBuildBacklinks.process(id, doneCallback);
25

  
26
	}
27
}
modules/dnet-efg/trunk/src/main/java/efg/backlink/MongoBuildBacklinks.java
1
package efg.backlink;
2

  
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.*;
6
import java.util.concurrent.ArrayBlockingQueue;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.ThreadPoolExecutor;
9
import java.util.concurrent.TimeUnit;
10
import javax.xml.namespace.QName;
11
import javax.xml.parsers.DocumentBuilder;
12
import javax.xml.parsers.DocumentBuilderFactory;
13
import javax.xml.parsers.ParserConfigurationException;
14
import javax.xml.stream.*;
15
import javax.xml.stream.events.EndElement;
16
import javax.xml.stream.events.StartElement;
17
import javax.xml.stream.events.XMLEvent;
18
import javax.xml.transform.*;
19
import javax.xml.transform.dom.DOMSource;
20
import javax.xml.transform.stream.StreamResult;
21
import javax.xml.transform.stream.StreamSource;
22

  
23
import com.mongodb.BasicDBObject;
24
import com.mongodb.DBObject;
25
import com.mongodb.client.MongoCollection;
26
import com.mongodb.client.MongoDatabase;
27
import com.mongodb.client.model.Filters;
28
import com.mongodb.client.model.UpdateOptions;
29
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
30
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
31
import eu.dnetlib.rmi.data.MDStoreServiceException;
32
import org.antlr.stringtemplate.StringTemplate;
33
import org.apache.commons.lang3.StringEscapeUtils;
34
import org.apache.commons.logging.Log;
35
import org.apache.commons.logging.LogFactory;
36
import org.bson.conversions.Bson;
37
import org.springframework.beans.factory.annotation.Autowired;
38
import org.springframework.beans.factory.annotation.Required;
39
import org.springframework.core.io.Resource;
40
import org.w3c.dom.Document;
41
import org.w3c.dom.Element;
42
import org.w3c.dom.Node;
43

  
44
/**
45
 * handle backlinks
46
 * <p/>
47
 * AvCreation Collection Corporate NonAVCreation Person
48
 *
49
 * @author marko
50
 */
51
public class MongoBuildBacklinks {
52

  
53
	private static final Log log = LogFactory.getLog(MongoBuildBacklinks.class); // NOPMD by marko on 11/24/08 5:02 PM
54
	private final int queueSize = 80;
55
	private final int cpus = Runtime.getRuntime().availableProcessors();
56
	private final ThreadLocal<XMLInputFactory> factory = new ThreadLocal<XMLInputFactory>() {
57

  
58
		@Override
59
		protected XMLInputFactory initialValue() {
60
			return XMLInputFactory.newInstance();
61
		}
62
	};
63
	private final ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
64

  
65
		@Override
66
		protected XMLOutputFactory initialValue() {
67
			return XMLOutputFactory.newInstance();
68
		}
69
	};
70
	private final ThreadLocal<Transformer> serializer = new ThreadLocal<Transformer>() {
71

  
72
		@Override
73
		protected Transformer initialValue() {
74
			final TransformerFactory factory = TransformerFactory.newInstance();
75
			try {
76
				final Transformer trans = factory.newTransformer();
77
				trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
78
				return trans;
79
			} catch (final TransformerConfigurationException e) {
80
				throw new IllegalStateException(e);
81
			}
82
		}
83
	};
84
	private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>() {
85

  
86
		@Override
87
		protected DocumentBuilder initialValue() {
88
			final DocumentBuilderFactory dbfac = DocumentBuilderFactory.newInstance();
89
			try {
90
				return dbfac.newDocumentBuilder();
91
			} catch (final ParserConfigurationException e) {
92
				throw new IllegalStateException(e);
93
			}
94
		}
95
	};
96
	public UpdateOperation sentinel = new UpdateOperation(null, null);
97
	@Autowired
98
	private MDStoreTransactionManagerImpl transactionManager;
99
	private BacklinkTypeMatcher backlinkTypeMatcher;
100
	private int MAX_NUMBER_OF_RELS = 1000;
101
	private List<String> relationTypes = new ArrayList<>();
102
	private Resource fixLinksXslt;
103
	private StringTemplate backlinkTemplate;
104
	private Set<String> entityTypes;
105
	private List<String> titleTypes;
106

  
107
	public void process(final String mdId, final DoneCallback doneCallback) throws MDStoreServiceException {
108
		try {
109
			log.debug("generating backlink for mdstore ID:" + mdId);
110

  
111
			ThreadPoolExecutor executor = createExecutor();
112

  
113
			final String internalId = transactionManager.readMdStore(mdId);
114

  
115
			final MongoDatabase db = transactionManager.getDb();
116

  
117
			final MongoCollection<DBObject> backlinks = db.getCollection(mdId + "backlinks", DBObject.class);
118

  
119
			backlinks.deleteMany(new BasicDBObject());
120

  
121
			final BlockingQueue<UpdateOperation> emitQueue = new ArrayBlockingQueue<>(500);
122

  
123
			final MongoCollection<DBObject> data = db.getCollection(internalId, DBObject.class);
124

  
125
			final long size = data.count();
126
			double currentPosition = 0;
127

  
128
			backlinks.drop();
129
			backlinks.createIndex(new BasicDBObject("id", 1));
130
			data.createIndex(new BasicDBObject("id", 1));
131
			data.createIndex(new BasicDBObject("originalId", 1));
132

  
133
			log.info("start background updating thread");
134
			final Thread backgroundUpdating = startBackgroundUpdating(emitQueue, backlinks);
135

  
136
			log.info("start parsing links");
137
			currentPosition = parseLinks(currentPosition, executor, data, emitQueue);
138

  
139
			executor.shutdown();
140
			executor.awaitTermination(10, TimeUnit.MINUTES);
141
			log.info("finish parallel part: grouping " + executor.isShutdown());
142

  
143
			emitQueue.put(sentinel);
144
			backgroundUpdating.join();
145

  
146
			log.info("finish background backlinks storing");
147

  
148
			executor = createExecutor();
149
			log.info("new executor " + executor.isShutdown());
150

  
151
			fixLinks(data, size, backlinks, executor, currentPosition);
152

  
153
			log.info("waiting for other executor");
154
			executor.shutdown();
155
			executor.awaitTermination(10, TimeUnit.MINUTES);
156
			log.info("finish parallel part: fixing");
157

  
158
			backlinks.createIndex(new BasicDBObject("id", 1));
159
			data.createIndex(new BasicDBObject("id", 1));
160
			data.createIndex(new BasicDBObject("originalId", 1));
161
			Map<String, String> paramsOut = new HashMap<>();
162
			doneCallback.call(paramsOut);
163

  
164
		} catch (final InterruptedException e) {
165
			throw new IllegalStateException(e);
166
		}
167
	}
168

  
169
	private void fixLinks(final MongoCollection<DBObject> data,
170
			final long size,
171
			final MongoCollection<DBObject> backlinks,
172
			final ThreadPoolExecutor executor,
173
			double currentPosition
174
	) {
175
		final double step = (1.0 * size) / backlinks.count();
176

  
177
		log.info("FIXLINKS");
178

  
179
		for (final DBObject link : backlinks.find()) {
180

  
181
			log.debug("FIXING LINK " + link);
182
			try {
183
				executor.submit((Runnable) () -> {
184
					try {
185
						@SuppressWarnings("unchecked")
186
						List<DBObject> links = (List<DBObject>) link.get("links");
187
						fixLinks(data, (String) link.get("id"), links);
188
					} catch (final Throwable e) {
189
						log.warn("problems fixing", e);
190
					}
191
				});
192
				currentPosition += step;
193
			} catch (final Throwable e) {
194
				log.warn("problems fixing", e);
195
			}
196
		}
197

  
198
		log.info("FINISH FIXLINKS");
199
	}
200

  
201
	private double parseLinks(double currentPosition,
202
			final ThreadPoolExecutor executor,
203
			final MongoCollection<DBObject> data,
204
			final BlockingQueue<UpdateOperation> emitQueue) {
205
		// log.info("PARSING LINKS " + collection);
206
		for (final DBObject current : data.find()) {
207
			try {
208
				// log.info("PARSING LINK " + current.get("originalId"));
209
				executor.submit((Runnable) () -> parseLinks((String) current.get("body"), emitQueue));
210
				currentPosition++;
211
			} catch (final Throwable e) {
212
				log.warn("problems parsing", e);
213
			}
214
		}
215
		return currentPosition;
216
	}
217

  
218
	private Thread startBackgroundUpdating(final BlockingQueue<UpdateOperation> queue, final MongoCollection<DBObject> coll) {
219
		final Thread background = new Thread(() -> {
220
			while (true) {
221
				try {
222
					final UpdateOperation record = queue.take();
223
					if (record == sentinel) {
224
						break;
225
					}
226

  
227
					final UpdateOptions op = new UpdateOptions();
228
					op.upsert(true);
229
					coll.updateOne((Bson) record.find, (Bson) record.update, op);
230
				} catch (final InterruptedException e) {
231
					log.fatal("got exception in background thread", e);
232
					throw new IllegalStateException(e);
233
				}
234
			}
235
		});
236
		background.start();
237
		return background;
238
	}
239

  
240
	protected ThreadPoolExecutor createExecutor() {
241
		return new ThreadPoolExecutor(cpus, cpus, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true),
242
				new ThreadPoolExecutor.CallerRunsPolicy());
243
	}
244

  
245
	private void fixLinks(final MongoCollection<DBObject> collection, final String id, final List<DBObject> links) {
246

  
247
		final Bson query = Filters.eq("originalId", id);
248
		final BasicDBObject source = (BasicDBObject) collection.find(query).first();
249

  
250
		if (source == null) {
251
			log.warn("object with originalId " + id + " doesn't exist or doesn't belong to this collection");
252
			return;
253
		}
254

  
255
		final String origBody = (String) source.get("body");
256
		log.info("UPDATING LINKS " + id);
257
		final String updated = updateLinks(id, origBody, links);
258
		// for (DBObject link : links) {
259
		// log.info(" link from " + id + " to: " + link.get("target"));
260
		// }
261

  
262
		if (!origBody.equalsIgnoreCase(updated)) {
263

  
264
			collection.updateOne(query, new BasicDBObject("$set", new BasicDBObject("body", updated)));
265
		}
266
	}
267

  
268
	public boolean isEfgEntity(final StartElement element) {
269
		return isEfgEntity(element.getName());
270
	}
271

  
272
	public boolean isEfgEntity(final EndElement element) {
273
		return isEfgEntity(element.getName());
274
	}
275

  
276
	public boolean isEfgEntity(final QName name) {
277
		return entityTypes.contains(name.getLocalPart());
278
	}
279

  
280
	public boolean isRelation(final XMLEvent event) {
281
		if (event.isStartElement()) return isRelation(event.asStartElement());
282
		return false;
283
	}
284

  
285
	public boolean isEndRelation(final XMLEvent event) {
286
		if (event.isEndElement()) return isRelation(event.asEndElement());
287
		return false;
288
	}
289

  
290
	public boolean isRelation(final StartElement element) {
291
		return isRelation(element.getName());
292
	}
293

  
294
	public boolean isRelation(final EndElement element) {
295
		return isRelation(element.getName());
296
	}
297

  
298
	public boolean isRelation(final QName name) {
299
		return name.getLocalPart().startsWith("rel");
300
	}
301

  
302
	public XMLEventReader fragment(final String source) throws XMLStreamException {
303
		return fragment(factory.get().createXMLEventReader(new StringReader(source)));
304
	}
305

  
306
	public XMLEventReader fragment(final XMLEventReader reader) throws XMLStreamException {
307
		return factory.get().createFilteredReader(reader, new EventFilter() {
308

  
309
			@Override
310
			public boolean accept(final XMLEvent evt) {
311
				return !(evt.isStartDocument() || evt.isEndDocument());
312
			}
313
		});
314
	}
315

  
316
	public String linkToFragment(final DBObject link) {
317
		StringTemplate template = new StringTemplate(backlinkTemplate.getTemplate());
318

  
319
		template.setAttribute("relType", prepareValueForTemplate(link.get("type")));
320
		template.setAttribute("targetId", prepareValueForTemplate(link.get("target")));
321
		template.setAttribute("title", prepareValueForTemplate(link.get("title")));
322
		template.setAttribute("name", prepareValueForTemplate(link.get("name")));
323
		template.setAttribute("type", prepareValueForTemplate(backlinkTypeMatcher.getInverseType(link.get("elementType").toString())));
324
		template.setAttribute("itemTypes", prepareValueForTemplate(link.get("itemTypes")));
325

  
326
		String res = template.toString();
327

  
328
		if (log.isDebugEnabled()) {
329
			log.debug("UPDATING adding link: " + res.replaceAll("\n", ""));
330
		}
331

  
332
		return res;
333
	}
334

  
335
	private Object prepareValueForTemplate(final Object obj) {
336
		if ((obj != null) && (obj instanceof String)) return StringEscapeUtils.escapeXml11(obj.toString());
337
		return obj;
338
	}
339

  
340
	private String maybeIdentifier(final XMLEventReader reader, final XMLEventWriter writer, final XMLEvent event) throws XMLStreamException {
341
		if (event.isStartElement() && event.asStartElement().getName().getLocalPart().equalsIgnoreCase("identifier")) {
342
			XMLEvent next = reader.peek();
343

  
344
			if (next.isCharacters()) return next.asCharacters().getData().trim();
345
		}
346
		return null;
347
	}
348

  
349
	public DBObject findLink(final String currentRelId, final List<DBObject> links) {
350
		for (DBObject link : links)
351
			if (link.get("target").equals(currentRelId)) return link;
352
		return null;
353
	}
354

  
355
	@SuppressWarnings("unchecked")
356
	public String updateLinks(final String id, final String body, final List<DBObject> links) {
357
		log.info("UPDATING links " + id);
358

  
359
		try {
360
			StringWriter res = new StringWriter();
361

  
362
			XMLEventReader reader = factory.get().createXMLEventReader(new StringReader(body));
363
			XMLEventWriter writer = outputFactory.get().createXMLEventWriter(res);
364

  
365
			Set<String> present = new HashSet<>();
366

  
367
			boolean inRelation = false;
368
			boolean hasItemType = false;
369
			String currentRelId = null;
370

  
371
			XMLEvent event = null;
372
			while (!(event = reader.nextEvent()).isEndDocument()) {
373

  
374
				if (isRelation(event)) {
375
					inRelation = true;
376
					currentRelId = null;
377
				} else if (isEndRelation(event)) {
378
					inRelation = false;
379

  
380
					if (!hasItemType) {
381
						DBObject obj = findLink(currentRelId, links);
382
						if (obj != null) {
383
							final Object itemTypes = obj.get("itemTypes");
384
							if (itemTypes != null) {
385
								for (String itemType : (List<String>) itemTypes) {
386
									writer.add(fragment("<efg:itemType xmlns:efg=\"http://www.europeanfilmgateway.eu/efg\" generated=\"true\">" + itemType
387
											+ "</efg:itemType>"));
388
								}
389
							}
390
						}
391
					}
392
				}
393

  
394
				if (inRelation) {
395
					// if current element is the identifier, mark the relation as seen.
396
					String relId = maybeIdentifier(reader, writer, event);
397
					if (relId != null) {
398
						currentRelId = relId;
399
						present.add(relId);
400
					}
401
					if (event.isStartElement() && event.asStartElement().getName().getLocalPart().equalsIgnoreCase("itemType")) {
402
						hasItemType = true;
403
					}
404
				}
405

  
406
				// flush new backlinks at the end of the record
407
				if (event.isEndElement()) {
408
					EndElement end = event.asEndElement();
409
					if (isEfgEntity(end)) {
410
						log.info("UPDATING end element is efgEntity");
411

  
412
						for (int i = 0; i < Math.min(links.size(), MAX_NUMBER_OF_RELS); i++) {
413
							DBObject link = links.get(i);
414
							// generate only those (back)links which are not already present.
415
							if (!present.contains(link.get("target"))) {
416
								writer.add(fragment(linkToFragment(link)));
417
							}
418
						}
419
					}
420
				}
421

  
422
				writer.add(event);
423
			}
424
			writer.close();
425
			return res.toString();
426
		} catch (Exception e) {
427
			throw new IllegalStateException(e);
428
		}
429

  
430
	}
431

  
432
	protected Document createLinksParams(final List<DBObject> links) throws ParserConfigurationException {
433
		final Document doc = docBuilder.get().newDocument();
434

  
435
		createLinksParams(links, doc);
436

  
437
		return doc;
438
	}
439

  
440
	protected Document createLinksParams(final List<DBObject> links, final Document doc) throws ParserConfigurationException {
441
		return createLinksParams(links, doc, doc);
442
	}
443

  
444
	protected Document createLinksParams(final List<DBObject> links, final Document doc, final Node container) throws ParserConfigurationException {
445
		final Element root = doc.createElement("links");
446

  
447
		container.appendChild(root);
448

  
449
		for (final DBObject link : links) {
450
			final Element node = doc.createElement("link");
451
			node.setAttribute("target", (String) link.get("target"));
452
			node.setAttribute("type", (String) link.get("type"));
453

  
454
			Arrays.asList("name", "title").stream().filter(attr -> link.get(attr) != null).forEach(attr -> {
455
				node.setAttribute(attr, (String) link.get(attr));
456
			});
457

  
458
			root.appendChild(node);
459
		}
460
		return doc;
461
	}
462

  
463
	protected String serialize(final Node doc) {
464
		try {
465
			// create string from xml tree
466
			final StringWriter sw = new StringWriter();
467
			final StreamResult result = new StreamResult(sw);
468
			final DOMSource source = new DOMSource(doc);
469

  
470
			serializer.get().transform(source, result);
471
			return sw.toString();
472

  
473
		} catch (final TransformerException e) {
474
			log.warn("cannot serialize", e);
475
		}
476
		return "";
477
	}
478

  
479
	public boolean moreSpecific(final String next, final String old, final List<String> categories) {
480
		if (old == null) return true;
481
		if (next == null) return false;
482
		return categories.indexOf(next.toLowerCase()) > categories.indexOf(old.toLowerCase());
483
	}
484

  
485
	private void parseLinks(final String record, final BlockingQueue<UpdateOperation> emitQueue) {
486
		// log.info("record: " + record);
487
		String recordId = null;
488
		String recordType = null;
489
		String relType = null;
490

  
491
		final List<ToEmit> toEmit = new ArrayList<>();
492
		final Map<String, String> params = new HashMap<>();
493

  
494
		// for persons:
495
		String firstName = null;
496
		String lastName = "";
497

  
498
		// for creations
499
		String title = null;
500
		String titleType = null;
501

  
502
		Stack<String> elementStack = new Stack<String>();
503
		elementStack.push("/");
504

  
505
		try {
506
			final XMLStreamReader parser = factory.get().createXMLStreamReader(new StreamSource(new StringReader(record)));
507
			while (parser.hasNext()) {
508
				int event = parser.next();
509

  
510
				if (event == XMLStreamConstants.END_ELEMENT) {
511
					pop(elementStack);
512
				} else if (event == XMLStreamConstants.START_ELEMENT) {
513
					final String localName = parser.getLocalName();
514
					push(elementStack, localName);
515

  
516
					if ("efgEntity".equalsIgnoreCase(localName)) {
517
						while (parser.hasNext()) {
518
							event = parser.next();
519

  
520
							if (event == XMLStreamConstants.START_ELEMENT) {
521
								recordType = parser.getLocalName();
522
								relType = normalizeRecordType(recordType);
523

  
524
								push(elementStack, recordType);
525
								break;
526
							}
527
						}
528
					}
529

  
530
					if ("identifier".equalsIgnoreCase(localName) && "efgEntity".equalsIgnoreCase(grandParent(elementStack))) {
531
						parser.next();
532
						recordId = parser.getText();
533
					} else if ("identifier".equalsIgnoreCase(localName)) {
534
						parser.next();
535
						// log.info("CCCCC: is an id but not the right depth: " + parser.getText());
536
						// log.info("stack " + elementStack);
537
					} else if ("person".equalsIgnoreCase(recordType)) {
538
						if ("name".equalsIgnoreCase(localName)) {
539
							final String part = parser.getAttributeValue(null, "part");
540
							parser.next();
541
							String value = parser.getText();
542
							if (value != null) {
543
								value = value.trim();
544
							}
545

  
546
							if ("forename".equalsIgnoreCase(part) || "Forename(s)".equalsIgnoreCase(part)) {
547
								firstName = value;
548
							} else if ("surname".equalsIgnoreCase(part) || "Family Name".equalsIgnoreCase(part)) {
549
								lastName = value;
550
							} else if ("stem".equalsIgnoreCase(part)) {
551
								lastName = value;
552
							}
553
						}
554
					} else if (isCreation(recordType)) {
555
						if ("title".equalsIgnoreCase(localName)) {
556
							String tmpTitle = null;
557
							String tmpType = null;
558

  
559
							while (parser.hasNext()) {
560
								event = parser.next();
561

  
562
								if (event == XMLStreamConstants.START_ELEMENT) {
563
									push(elementStack, parser.getLocalName());
564

  
565
									if ("text".equalsIgnoreCase(parser.getLocalName())) {
566
										event = parser.next();
567
										if (event == XMLStreamConstants.CHARACTERS) {
568
											tmpTitle = parser.getText();
569
										}
570
									} else if ("relation".equalsIgnoreCase(parser.getLocalName())) {
571
										event = parser.next();
572
										if (event == XMLStreamConstants.CHARACTERS) {
573
											tmpType = parser.getText();
574
										}
575
									}
576

  
577
								} else if (event == XMLStreamConstants.END_ELEMENT) {
578
									pop(elementStack);
579

  
580
									if ("title".equalsIgnoreCase(parser.getLocalName())) {
581
										tmpType = (tmpType == null) ? "n/a" : tmpType.trim();
582
										tmpTitle = (tmpTitle == null) ? "n/a" : tmpTitle.trim();
583

  
584
										if (moreSpecific(tmpType, titleType, titleTypes)) {
585
											title = tmpTitle;
586
											titleType = tmpType;
587
										}
588

  
589
										break;
590
									}
591
								}
592
							}
593
						}
594
					}
595

  
596
					if (relationTypes.contains(localName)) {
597
						ToEmit nextEmission = null;
598

  
599
						while (parser.hasNext()) {
600
							event = parser.next();
601

  
602
							if (event == XMLStreamConstants.START_ELEMENT) {
603
								push(elementStack, parser.getLocalName());
604

  
605
								if ("identifier".equalsIgnoreCase(parser.getLocalName())) {
606
									parser.next();
607

  
608
									// toEmit.add(new ToEmit(parser.getText().trim(), "n/a"));
609
									nextEmission = new ToEmit(parser.getText().trim(), "n/a");
610
									// break;
611
								} else if ("type".equalsIgnoreCase(parser.getLocalName())) {
612
									parser.next();
613

  
614
									nextEmission.setElementType(parser.getText().trim());
615
									toEmit.add(nextEmission);
616
									break;
617
								}
618
							} else if (event == XMLStreamConstants.END_ELEMENT) {
619
								pop(elementStack);
620
							}
621
						}
622
					}
623
				}
624
			}
625

  
626
			if ("person".equalsIgnoreCase(recordType)) {
627
				String personName = lastName;
628
				if (firstName != null) {
629
					personName = firstName + " " + lastName;
630
				}
631

  
632
				params.put("name", personName);
633
			} else if (isCreation(recordType) && (title != null)) {
634
				params.put("title", title);
635
			}
636

  
637
			if (toEmit.isEmpty() && log.isDebugEnabled()) {
638
				log.debug("emit queue is empty for record " + recordId.trim());
639
			}
640

  
641
			for (final ToEmit emitting : toEmit) {
642
				emitBacklink(emitQueue, emitting.targetId, recordId.trim(), relType, emitting.elementType, params);
643
			}
644

  
645
		} catch (final XMLStreamException e) {
646
			log.info("some parsing exception, moving along");
647
		}
648

  
649
	}
650

  
651
	private String grandParent(final Stack<String> elementStack) {
652
		if (elementStack.size() <= 3) return "";
653
		String res = elementStack.get(elementStack.size() - 3);
654
		return res;
655
	}
656

  
657
	private void pop(final Stack<String> elementStack) {
658
		elementStack.pop();
659
	}
660

  
661
	private void push(final Stack<String> elementStack, final String localName) {
662
		elementStack.push(localName);
663
	}
664

  
665
	private boolean isCreation(final String recordType) {
666
		return "avcreation".equalsIgnoreCase(recordType) || "nonavcreation".equalsIgnoreCase(recordType);
667
	}
668

  
669
	private String normalizeRecordType(final String localName) {
670
		final String relName = "rel" + localName;
671

  
672
		for (final String type : relationTypes) {
673
			if (relName.equalsIgnoreCase(type)) return type;
674
		}
675
		return relName;
676
	}
677

  
678
	private void emitBacklink(final BlockingQueue<UpdateOperation> emitQueue,
679
			final String source,
680
			final String target,
681
			final String targetType,
682
			final String elementType,
683
			final Map<String, String> params) {
684

  
685
		final DBObject pair = new BasicDBObject();
686
		pair.put("target", target);
687
		pair.put("type", targetType);
688
		pair.put("elementType", elementType);
689
		pair.putAll(params);
690

  
691
		final DBObject find = new BasicDBObject("id", source);
692

  
693
		final DBObject update = new BasicDBObject("$push", new BasicDBObject("links", pair));
694

  
695
		try {
696
			emitQueue.put(new UpdateOperation(find, update));
697
		} catch (final InterruptedException e) {
698
			throw new IllegalStateException(e);
699
		}
700
	}
701

  
702
	public List<String> getRelationTypes() {
703
		return relationTypes;
704
	}
705

  
706
	public void setRelationTypes(final List<String> relationTypes) {
707
		this.relationTypes = relationTypes;
708
	}
709

  
710
	public Resource getFixLinksXslt() {
711
		return fixLinksXslt;
712
	}
713

  
714
	@Required
715
	public void setFixLinksXslt(final Resource fixLinksXslt) {
716
		this.fixLinksXslt = fixLinksXslt;
717
	}
718

  
719
	public StringTemplate getBacklinkTemplate() {
720
		return backlinkTemplate;
721
	}
722

  
723
	@Required
724
	public void setBacklinkTemplate(final StringTemplate backlinkTemplate) {
725
		this.backlinkTemplate = backlinkTemplate;
726
	}
727

  
728
	public List<String> getTitleTypes() {
729
		return titleTypes;
730
	}
731

  
732
	public void setTitleTypes(final List<String> titleTypes) {
733
		this.titleTypes = titleTypes;
734
	}
735

  
736
	public Set<String> getEntityTypes() {
737
		return entityTypes;
738
	}
739

  
740
	public void setEntityTypes(final Set<String> entityTypes) {
741
		this.entityTypes = entityTypes;
742
	}
743

  
744
	public BacklinkTypeMatcher getBacklinkTypeMatcher() {
745
		return backlinkTypeMatcher;
746
	}
747

  
748
	@Required
749
	public void setBacklinkTypeMatcher(final BacklinkTypeMatcher backlinkTypeMatcher) {
750
		this.backlinkTypeMatcher = backlinkTypeMatcher;
751
	}
752

  
753
	public int getMAX_NUMBER_OF_RELS() {
754
		return MAX_NUMBER_OF_RELS;
755
	}
756

  
757
	@Required
758
	public void setMAX_NUMBER_OF_RELS(final int mAXNUMBEROFRELS) {
759
		MAX_NUMBER_OF_RELS = mAXNUMBEROFRELS;
760
	}
761

  
762
	public static class UpdateOperation {
763

  
764
		private DBObject find;
765
		private DBObject update;
766

  
767
		public UpdateOperation(final DBObject find, final DBObject update) {
768
			super();
769
			this.find = find;
770
			this.update = update;
771
		}
772

  
773
		public DBObject getFind() {
774
			return find;
775
		}
776

  
777
		public void setFind(final DBObject find) {
778
			this.find = find;
779
		}
780

  
781
		public DBObject getUpdate() {
782
			return update;
783
		}
784

  
785
		public void setUpdate(final DBObject update) {
786
			this.update = update;
787
		}
788
	}
789

  
790
	static class ToEmit {
791

  
792
		private String targetId;
793
		private String elementType;
794

  
795
		public ToEmit(final String targetId, final String elementType) {
796
			super();
797
			this.targetId = targetId;
798
			this.elementType = elementType;
799
		}
800

  
801
		public String getTargetId() {
802
			return targetId;
803
		}
804

  
805
		public void setTargetId(final String targetId) {
806
			this.targetId = targetId;
807
		}
808

  
809
		public String getElementType() {
810
			return elementType;
811
		}
812

  
813
		public void setElementType(final String elementType) {
814
			this.elementType = elementType;
815
		}
816
	}
817

  
818
}
modules/dnet-efg/trunk/src/main/resources/eu/dnetlib/efg/backlink/fixLinksEfg.xslt
1
<?xml version="1.0" encoding="UTF-8"?>
2
<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
3
                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:datetime="http://exslt.org/dates-and-times"
4
                xmlns:exslt="http://exslt.org/common" xmlns:xalan="http://xml.apache.org/xalan"
5
                xmlns:efg="http://www.europeanfilmgateway.eu/efg" version="1.0"
6
                exclude-result-prefixes="xsl datetime exslt xsi xalan">
7

  
8
	<!--	<xsl:param name="links" />  -->
9

  
10
	<xsl:output method="xml" indent="yes"/>
11
	<xsl:template match="/">
12
		<xsl:apply-templates select="*"/>
13
	</xsl:template>
14

  
15
	<xsl:template match="/record/links"/>
16

  
17
	<xsl:template match="efg:person|efg:avcreation|efg:nonavcreation">
18
		<xsl:copy>
19
			<xsl:apply-templates select="*|@*|text()"/>
20

  
21
			<xsl:for-each select="/record/links/link">
22
				<xsl:call-template name="buildRelation">
23
					<xsl:with-param name="type" select="@type"/>
24
					<xsl:with-param name="target" select="@target"/>
25
					<xsl:with-param name="name" select="@name"/>
26
					<xsl:with-param name="title" select="@title"/>
27
				</xsl:call-template>
28
			</xsl:for-each>
29

  
30
		</xsl:copy>
31
	</xsl:template>
32

  
33
	<xsl:template match="*|@*|text()">
34
		<xsl:copy>
35
			<xsl:apply-templates select="*|@*|text()"/>
36
		</xsl:copy>
37
	</xsl:template>
38

  
39
	<xsl:template name="buildRelation">
40
		<xsl:param name="type"/>
41
		<xsl:param name="target"/>
42
		<xsl:param name="name"/>
43
		<xsl:param name="title"/>
44

  
45
		<xsl:if test="not(/record/metadata/efg:efgEntity/efg:*/efg:*[local-name()=$type]/efg:identifier[normalize-space(text()) = $target])">
46
			<xsl:element name="efg:{$type}">
47

  
48
				<!-- HACK michele -->
49
				<xsl:if test="($type = 'relAvCreation') or ($type = 'relNonAVCreation')">
50
					<efg:hasItems>true</efg:hasItems>
51
				</xsl:if>
52
				<!-- FINE HACK -->
53

  
54
				<efg:identifier>
55
					<xsl:value-of select="$target"/>
56
				</efg:identifier>
57
				<xsl:if test="$name">
58
					<efg:name>
59
						<xsl:value-of select="$name"/>
60
					</efg:name>
61
				</xsl:if>
62
				<xsl:if test="$title">
63
					<efg:title>
64
						<xsl:value-of select="$title"/>
65
					</efg:title>
66
				</xsl:if>
67
			</xsl:element>
68
		</xsl:if>
69

  
70
	</xsl:template>
71

  
72
</xsl:stylesheet>
modules/dnet-efg/trunk/src/main/resources/eu/dnetlib/efg/backlink/backlink.st
1
<efg:$relType$ xmlns:efg="http://www.europeanfilmgateway.eu/efg" generated="true">
2
 <efg:identifier>$targetId$</efg:identifier>
3
 $if(title)$<efg:title>$title$</efg:title>$endif$
4
 $if(name)$<efg:name>$name$</efg:name>$endif$
5
 <efg:type>$if(type)$$type$$else$n/a$endif$</efg:type>
6
 $itemTypes:{<efg:itemType>$it$</efg:itemType>}$
7
</efg:$relType$>
modules/dnet-efg/trunk/src/main/resources/eu/dnetlib/efg/backlink/applicationContext-backlink.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2

  
3
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4
       xmlns:p="http://www.springframework.org/schema/p" xmlns="http://www.springframework.org/schema/beans"
5
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
6

  
7

  
8
	<bean id="backLinkPlugin" class="efg.backlink.BackLinkPlugin">
9

  
10
	</bean>
11

  
12
	<bean id="backlinkBuilder"
13
	      class="efg.backlink.MongoBuildBacklinks"
14
	      p:fixLinksXslt="classpath:/eu/dnetlib/efg/backlink/fixLinksEfg.xslt"
15
	      p:backlinkTemplate-ref="backlinkTemplate"
16
	      p:MAX_NUMBER_OF_RELS="${services.mdstore.backlinks.maxrels}">
17
		<property name="backlinkTypeMatcher">
18
			<bean class="efg.backlink.PropertyBacklinkTypeMatcherImpl">
19
				<constructor-arg index="0" value="${services.mdstore.backlinks.typerules}"/>
20
			</bean>
21
		</property>
22
		<property name="relationTypes">
23
			<list>
24
				<value>relAvCreation</value>
25
				<value>relCorporate</value>
26
				<value>relNonAVCreation</value>
27
				<value>relPerson</value>
28
				<value>relCollection</value>
29
				<value>relEvent</value>
30
			</list>
31
		</property>
32
		<property name="entityTypes">
33
			<set>
34
				<value>person</value>
35
				<value>avcreation</value>
36
				<value>nonavcreation</value>
37
				<value>corporate</value>
38
				<value>group</value>
39
				<value>collection</value>
40
				<value>decisionEvent</value>
41
				<value>publicationEvent</value>
42
				<value>productionEvent</value>
43
				<value>award</value>
44
				<value>iprEvent</value>
45
			</set>
46
		</property>
47
		<property name="titleTypes">
48
			<!-- REMEMBER TO INSERT VALUES IN LOWER CASE -->
49
			<list>
50
				<value>series title</value>
51
				<value>episode title</value>
52
				<value>main title</value>
53
				<value>original title</value>
54
			</list>
55
		</property>
56
	</bean>
57

  
58
	<bean id="backlinkTemplate"
59
	      class="eu.dnetlib.springutils.stringtemplate.StringTemplateFactory"
60
	      p:template="classpath:/eu/dnetlib/efg/backlink/backlink.st"
61
	      scope="prototype"/>
62

  
63
</beans>
modules/dnet-efg/trunk/src/main/java/eu/dnetlib/efg/backlinks/BackLinksPlugin.java
1
package eu.dnetlib.efg.backlinks;
2

  
3
import java.util.Map;
4

  
5
import org.springframework.beans.factory.annotation.Autowired;
6

  
7
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
8
import eu.dnetlib.data.mdstore.modular.action.MDStorePlugin;
9
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
10
import eu.dnetlib.rmi.data.MDStoreServiceException;
11

  
12
/**
13
 * Created by Sandro La Bruzzo on 12/2/15.
14
 */
15

  
16
public class BackLinksPlugin implements MDStorePlugin {
17

  
18
	@Autowired
19
	private MongoBuildBacklinks mongoBuildBacklinks;
20

  
21
	@Override
22
	public void run(final MDStoreDao dao, final Map<String, String> params, final DoneCallback doneCallback) throws MDStoreServiceException {
23

  
24
		final String id = params.get("mdStoreId");
25
		this.mongoBuildBacklinks.process(id, doneCallback);
26

  
27
	}
28
}
modules/dnet-efg/trunk/src/main/java/eu/dnetlib/efg/backlinks/MongoBuildBacklinks.java
1
package eu.dnetlib.efg.backlinks;
2

  
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.*;
6
import java.util.concurrent.ArrayBlockingQueue;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.ThreadPoolExecutor;
9
import java.util.concurrent.TimeUnit;
10
import javax.xml.namespace.QName;
11
import javax.xml.parsers.DocumentBuilder;
12
import javax.xml.parsers.DocumentBuilderFactory;
13
import javax.xml.parsers.ParserConfigurationException;
14
import javax.xml.stream.*;
15
import javax.xml.stream.events.EndElement;
16
import javax.xml.stream.events.StartElement;
17
import javax.xml.stream.events.XMLEvent;
18
import javax.xml.transform.*;
19
import javax.xml.transform.dom.DOMSource;
20
import javax.xml.transform.stream.StreamResult;
21
import javax.xml.transform.stream.StreamSource;
22

  
23
import com.mongodb.BasicDBObject;
24
import com.mongodb.DBObject;
25
import com.mongodb.client.MongoCollection;
26
import com.mongodb.client.MongoDatabase;
27
import com.mongodb.client.model.Filters;
28
import com.mongodb.client.model.UpdateOptions;
29
import eu.dnetlib.data.mdstore.modular.action.DoneCallback;
30
import eu.dnetlib.data.mdstore.modular.mongodb.MDStoreTransactionManagerImpl;
31
import eu.dnetlib.rmi.data.MDStoreServiceException;
32
import org.antlr.stringtemplate.StringTemplate;
33
import org.apache.commons.lang3.StringEscapeUtils;
34
import org.apache.commons.logging.Log;
35
import org.apache.commons.logging.LogFactory;
36
import org.bson.conversions.Bson;
37
import org.springframework.beans.factory.annotation.Autowired;
38
import org.springframework.beans.factory.annotation.Required;
39
import org.springframework.core.io.Resource;
40
import org.w3c.dom.Document;
41
import org.w3c.dom.Element;
42
import org.w3c.dom.Node;
43

  
44
/**
45
 * handle backlinks
46
 * <p/>
47
 * AvCreation Collection Corporate NonAVCreation Person
48
 *
49
 * @author marko
50
 */
51
public class MongoBuildBacklinks {
52

  
53
	private static final Log log = LogFactory.getLog(MongoBuildBacklinks.class); // NOPMD by marko on 11/24/08 5:02 PM
54
	private final int queueSize = 80;
55
	private final int cpus = Runtime.getRuntime().availableProcessors();
56
	private final ThreadLocal<XMLInputFactory> factory = new ThreadLocal<XMLInputFactory>() {
57

  
58
		@Override
59
		protected XMLInputFactory initialValue() {
60
			return XMLInputFactory.newInstance();
61
		}
62
	};
63
	private final ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
64

  
65
		@Override
66
		protected XMLOutputFactory initialValue() {
67
			return XMLOutputFactory.newInstance();
68
		}
69
	};
70
	private final ThreadLocal<Transformer> serializer = new ThreadLocal<Transformer>() {
71

  
72
		@Override
73
		protected Transformer initialValue() {
74
			final TransformerFactory factory = TransformerFactory.newInstance();
75
			try {
76
				final Transformer trans = factory.newTransformer();
77
				trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
78
				return trans;
79
			} catch (final TransformerConfigurationException e) {
80
				throw new IllegalStateException(e);
81
			}
82
		}
83
	};
84
	private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>() {
85

  
86
		@Override
87
		protected DocumentBuilder initialValue() {
88
			final DocumentBuilderFactory dbfac = DocumentBuilderFactory.newInstance();
89
			try {
90
				return dbfac.newDocumentBuilder();
91
			} catch (final ParserConfigurationException e) {
92
				throw new IllegalStateException(e);
93
			}
94
		}
95
	};
96
	public UpdateOperation sentinel = new UpdateOperation(null, null);
97
	@Autowired
98
	private MDStoreTransactionManagerImpl transactionManager;
99
	private BacklinkTypeMatcher backlinkTypeMatcher;
100
	private int MAX_NUMBER_OF_RELS = 1000;
101
	private List<String> relationTypes = new ArrayList<>();
102
	private Resource fixLinksXslt;
103
	private StringTemplate backlinkTemplate;
104
	private Set<String> entityTypes;
105
	private List<String> titleTypes;
106

  
107
	public void process(final String mdId, final DoneCallback doneCallback) throws MDStoreServiceException {
108
		try {
109
			log.debug("generating backlink for mdstore ID:" + mdId);
110

  
111
			ThreadPoolExecutor executor = createExecutor();
112

  
113
			final String internalId = transactionManager.readMdStore(mdId);
114

  
115
			final MongoDatabase db = transactionManager.getDb();
116

  
117
			final MongoCollection<DBObject> backlinks = db.getCollection(mdId + "backlinks", DBObject.class);
118

  
119
			backlinks.deleteMany(new BasicDBObject());
120

  
121
			final BlockingQueue<UpdateOperation> emitQueue = new ArrayBlockingQueue<>(500);
122

  
123
			final MongoCollection<DBObject> data = db.getCollection(internalId, DBObject.class);
124

  
125
			final long size = data.count();
126
			double currentPosition = 0;
127

  
128
			backlinks.drop();
129
			backlinks.createIndex(new BasicDBObject("id", 1));
130
			data.createIndex(new BasicDBObject("id", 1));
131
			data.createIndex(new BasicDBObject("originalId", 1));
132

  
133
			log.info("start background updating thread");
134
			final Thread backgroundUpdating = startBackgroundUpdating(emitQueue, backlinks);
135

  
136
			log.info("start parsing links");
137
			currentPosition = parseLinks(currentPosition, executor, data, emitQueue);
138

  
139
			executor.shutdown();
140
			executor.awaitTermination(10, TimeUnit.MINUTES);
141
			log.info("finish parallel part: grouping " + executor.isShutdown());
142

  
143
			emitQueue.put(sentinel);
144
			backgroundUpdating.join();
145

  
146
			log.info("finish background backlinks storing");
147

  
148
			executor = createExecutor();
149
			log.info("new executor " + executor.isShutdown());
150

  
151
			fixLinks(data, size, backlinks, executor, currentPosition);
152

  
153
			log.info("waiting for other executor");
154
			executor.shutdown();
155
			executor.awaitTermination(10, TimeUnit.MINUTES);
156
			log.info("finish parallel part: fixing");
157

  
158
			backlinks.createIndex(new BasicDBObject("id", 1));
159
			data.createIndex(new BasicDBObject("id", 1));
160
			data.createIndex(new BasicDBObject("originalId", 1));
161
			Map<String, String> paramsOut = new HashMap<>();
162
			doneCallback.call(paramsOut);
163

  
164
		} catch (final InterruptedException e) {
165
			throw new IllegalStateException(e);
166
		}
167
	}
168

  
169
	private void fixLinks(final MongoCollection<DBObject> data,
170
			final long size,
171
			final MongoCollection<DBObject> backlinks,
172
			final ThreadPoolExecutor executor,
173
			double currentPosition
174
	) {
175
		final double step = (1.0 * size) / backlinks.count();
176

  
177
		log.info("FIXLINKS");
178

  
179
		for (final DBObject link : backlinks.find()) {
180

  
181
			log.debug("FIXING LINK " + link);
182
			try {
183
				executor.submit((Runnable) () -> {
184
					try {
185
						@SuppressWarnings("unchecked")
186
						List<DBObject> links = (List<DBObject>) link.get("links");
187
						fixLinks(data, (String) link.get("id"), links);
188
					} catch (final Throwable e) {
189
						log.warn("problems fixing", e);
190
					}
191
				});
192
				currentPosition += step;
193
			} catch (final Throwable e) {
194
				log.warn("problems fixing", e);
195
			}
196
		}
197

  
198
		log.info("FINISH FIXLINKS");
199
	}
200

  
201
	private double parseLinks(double currentPosition,
202
			final ThreadPoolExecutor executor,
203
			final MongoCollection<DBObject> data,
204
			final BlockingQueue<UpdateOperation> emitQueue) {
205
		// log.info("PARSING LINKS " + collection);
206
		for (final DBObject current : data.find()) {
207
			try {
208
				// log.info("PARSING LINK " + current.get("originalId"));
209
				executor.submit((Runnable) () -> parseLinks((String) current.get("body"), emitQueue));
210
				currentPosition++;
211
			} catch (final Throwable e) {
212
				log.warn("problems parsing", e);
213
			}
214
		}
215
		return currentPosition;
216
	}
217

  
218
	private Thread startBackgroundUpdating(final BlockingQueue<UpdateOperation> queue, final MongoCollection<DBObject> coll) {
219
		final Thread background = new Thread(() -> {
220
			while (true) {
221
				try {
222
					final UpdateOperation record = queue.take();
223
					if (record == sentinel) {
224
						break;
225
					}
226

  
227
					final UpdateOptions op = new UpdateOptions();
228
					op.upsert(true);
229
					coll.updateOne((Bson) record.find, (Bson) record.update, op);
230
				} catch (final InterruptedException e) {
231
					log.fatal("got exception in background thread", e);
232
					throw new IllegalStateException(e);
233
				}
234
			}
235
		});
236
		background.start();
237
		return background;
238
	}
239

  
240
	protected ThreadPoolExecutor createExecutor() {
241
		return new ThreadPoolExecutor(cpus, cpus, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize, true),
242
				new ThreadPoolExecutor.CallerRunsPolicy());
243
	}
244

  
245
	private void fixLinks(final MongoCollection<DBObject> collection, final String id, final List<DBObject> links) {
246

  
247
		final Bson query = Filters.eq("originalId", id);
248
		final BasicDBObject source = (BasicDBObject) collection.find(query).first();
249

  
250
		if (source == null) {
251
			log.warn("object with originalId " + id + " doesn't exist or doesn't belong to this collection");
252
			return;
253
		}
254

  
255
		final String origBody = (String) source.get("body");
256
		log.info("UPDATING LINKS " + id);
257
		final String updated = updateLinks(id, origBody, links);
258
		// for (DBObject link : links) {
259
		// log.info(" link from " + id + " to: " + link.get("target"));
260
		// }
261

  
262
		if (!origBody.equalsIgnoreCase(updated)) {
263

  
264
			collection.updateOne(query, new BasicDBObject("$set", new BasicDBObject("body", updated)));
265
		}
266
	}
267

  
268
	public boolean isEfgEntity(final StartElement element) {
269
		return isEfgEntity(element.getName());
270
	}
271

  
272
	public boolean isEfgEntity(final EndElement element) {
273
		return isEfgEntity(element.getName());
274
	}
275

  
276
	public boolean isEfgEntity(final QName name) {
277
		return entityTypes.contains(name.getLocalPart());
278
	}
279

  
280
	public boolean isRelation(final XMLEvent event) {
281
		if (event.isStartElement()) return isRelation(event.asStartElement());
282
		return false;
283
	}
284

  
285
	public boolean isEndRelation(final XMLEvent event) {
286
		if (event.isEndElement()) return isRelation(event.asEndElement());
287
		return false;
288
	}
289

  
290
	public boolean isRelation(final StartElement element) {
291
		return isRelation(element.getName());
292
	}
293

  
294
	public boolean isRelation(final EndElement element) {
295
		return isRelation(element.getName());
296
	}
297

  
298
	public boolean isRelation(final QName name) {
299
		return name.getLocalPart().startsWith("rel");
300
	}
301

  
302
	public XMLEventReader fragment(final String source) throws XMLStreamException {
303
		return fragment(factory.get().createXMLEventReader(new StringReader(source)));
304
	}
305

  
306
	public XMLEventReader fragment(final XMLEventReader reader) throws XMLStreamException {
307
		return factory.get().createFilteredReader(reader, new EventFilter() {
308

  
309
			@Override
310
			public boolean accept(final XMLEvent evt) {
311
				return !(evt.isStartDocument() || evt.isEndDocument());
312
			}
313
		});
314
	}
315

  
316
	public String linkToFragment(final DBObject link) {
317
		StringTemplate template = new StringTemplate(backlinkTemplate.getTemplate());
318

  
319
		template.setAttribute("relType", prepareValueForTemplate(link.get("type")));
320
		template.setAttribute("targetId", prepareValueForTemplate(link.get("target")));
321
		template.setAttribute("title", prepareValueForTemplate(link.get("title")));
322
		template.setAttribute("name", prepareValueForTemplate(link.get("name")));
323
		template.setAttribute("type", prepareValueForTemplate(backlinkTypeMatcher.getInverseType(link.get("elementType").toString())));
324
		template.setAttribute("itemTypes", prepareValueForTemplate(link.get("itemTypes")));
325

  
326
		String res = template.toString();
327

  
328
		if (log.isDebugEnabled()) {
329
			log.debug("UPDATING adding link: " + res.replaceAll("\n", ""));
330
		}
331

  
332
		return res;
333
	}
334

  
335
	private Object prepareValueForTemplate(final Object obj) {
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff