Project

General

Profile

1
package eu.dnetlib.functionality.index.solr.feed;
2

    
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.HashMap;
6
import java.util.Iterator;
7
import java.util.List;
8
import javax.xml.stream.*;
9
import javax.xml.stream.events.Namespace;
10
import javax.xml.stream.events.StartElement;
11
import javax.xml.stream.events.XMLEvent;
12

    
13
import com.google.common.collect.Lists;
14
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer.Mode;
15
import org.apache.solr.common.SolrInputDocument;
16

    
17
/**
18
 * Optimized version of the document parser, drop in replacement of InputDocumentFactory.
19
 *
20
 * <p>
21
 * Faster because:
22
 * </p>
23
 * <ul>
24
 * <li>Doesn't create a DOM for the full document</li>
25
 * <li>Doesn't execute xpaths agains the DOM</li>
26
 * <li>Quickly serialize the 'result' element directly in a string.</li>
27
 * <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
28
 * </ul>
29
 *
30
 * <p>
31
 * This class is fully reentrant and can be invoked in parallel.
32
 * </p>
33
 *
34
 * @author marko
35
 *
36
 */
37
public class StreamingInputDocumentFactory extends InputDocumentFactory {
38

    
39
	protected static final String DEFAULTDNETRESULT = "dnetResult";
40

    
41
	protected static final String TARGETFIELDS = "targetFields";
42

    
43
	protected static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
44

    
45
	protected static final String ROOT_ELEMENT = "indexRecord";
46

    
47
	protected static final int MAX_FIELD_LENGTH = 25000;
48

    
49
	protected ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
50

    
51
		@Override
52
		protected XMLInputFactory initialValue() {
53
			return XMLInputFactory.newInstance();
54
		}
55
	};
56

    
57
	protected ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
58

    
59
		@Override
60
		protected XMLOutputFactory initialValue() {
61
			return XMLOutputFactory.newInstance();
62
		}
63
	};
64

    
65
	protected ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
66

    
67
		@Override
68
		protected XMLEventFactory initialValue() {
69
			return XMLEventFactory.newInstance();
70
		}
71
	};
72

    
73
	@Override
74
	public SolrInputDocument parseDocument(final String version, final String inputDocument, final String dsId, final String resultName)
75
			throws XMLStreamException {
76
		return parseDocument(version, inputDocument, dsId, resultName, null);
77
	}
78

    
79
	@Override
80
	public SolrInputDocument parseDocument(final String version,
81
			final String inputDocument,
82
			final String dsId,
83
			final String resultName,
84
			final ResultTransformer resultTransformer) {
85

    
86
		final StringWriter results = new StringWriter();
87
		final List<Namespace> nsList = Lists.newLinkedList();
88
		try {
89

    
90
			XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
91

    
92
			final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
93

    
94
			while (parser.hasNext()) {
95
				final XMLEvent event = parser.nextEvent();
96
				if ((event != null) && event.isStartElement()) {
97
					final String localName = event.asStartElement().getName().getLocalPart();
98

    
99
					if (ROOT_ELEMENT.equals(localName)) {
100
						nsList.addAll(getNamespaces(event));
101
					} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
102
						final XMLEvent text = parser.nextEvent();
103
						String recordId = getText(text);
104
						indexDocument.addField(INDEX_RECORD_ID, recordId);
105
					} else if (TARGETFIELDS.equals(localName)) {
106
						parseTargetFields(indexDocument, parser);
107
					} else if (resultName.equals(localName)) {
108
						if (resultTransformer == null || !(Mode.empty.equals(resultTransformer.getMode()))) {
109
							copyResult(indexDocument, results, parser, nsList, resultName, resultTransformer);
110
						}
111
					}
112
				}
113
			}
114

    
115

    
116
			if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
117
				indexDocument.clear();
118
				System.err.println("missing indexrecord id:\n" + inputDocument);
119
			}
120

    
121
			return indexDocument;
122
		} catch (XMLStreamException e) {
123
			return new SolrInputDocument();
124
		}
125
	}
126

    
127
	private List<Namespace> getNamespaces(final XMLEvent event) {
128
		final List<Namespace> res = Lists.newLinkedList();
129
		@SuppressWarnings("unchecked")
130
		Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
131
		while (nsIter.hasNext()) {
132
			Namespace ns = nsIter.next();
133
			res.add(ns);
134
		}
135
		return res;
136
	}
137

    
138
	/**
139
	 * Parse the targetFields block and add fields to the solr document.
140
	 *
141
	 * @param indexDocument
142
	 * @param parser
143
	 * @throws XMLStreamException
144
	 */
145
	protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
146

    
147
		boolean hasFields = false;
148

    
149
		while (parser.hasNext()) {
150
			final XMLEvent targetEvent = parser.nextEvent();
151
			if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
152
				break;
153
			}
154

    
155
			if (targetEvent.isStartElement()) {
156
				final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
157
				final XMLEvent text = parser.nextEvent();
158

    
159
				String data = getText(text);
160

    
161
				addField(indexDocument, fieldName, data);
162
				hasFields = true;
163
			}
164
		}
165

    
166
		if (!hasFields) {
167
			indexDocument.clear();
168
		}
169
	}
170

    
171
	/**
172
	 * Copy the /indexRecord/result element and children, preserving namespace declarations etc.
173
	 *
174
	 * @param indexDocument
175
	 * @param results
176
	 * @param parser
177
	 * @param nsList
178
	 * @throws XMLStreamException
179
	 */
180
	protected void copyResult(final SolrInputDocument indexDocument,
181
			final StringWriter results,
182
			final XMLEventReader parser,
183
			final List<Namespace> nsList,
184
			final String dnetResult,
185
			final ResultTransformer resultTransformer) throws XMLStreamException {
186
		final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
187

    
188
		for (Namespace ns : nsList) {
189
			eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
190
		}
191

    
192
		StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
193

    
194
		// new root record
195
		writer.add(newRecord);
196

    
197
		// copy the rest as it is
198
		while (parser.hasNext()) {
199
			final XMLEvent resultEvent = parser.nextEvent();
200

    
201
			// TODO: replace with depth tracking instead of close tag tracking.
202
			if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
203
				writer.add(eventFactory.get().createEndElement("", null, RESULT));
204
				break;
205
			}
206

    
207
			writer.add(resultEvent);
208
		}
209
		writer.close();
210

    
211
		if (resultTransformer != null) {
212
			indexDocument.addField(INDEX_RESULT, resultTransformer.apply(results.toString()));
213
		} else {
214
			indexDocument.addField(INDEX_RESULT, results.toString());
215
		}
216
	}
217

    
218
	/**
219
	 * Helper used to add a field to a solr doc. It avoids to add empy fields
220
	 *
221
	 * @param indexDocument
222
	 * @param field
223
	 * @param value
224
	 */
225
	private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
226
		String cleaned = value.trim();
227
		if (!cleaned.isEmpty()) {
228
			// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
229
			indexDocument.addField(field.toLowerCase(), cleaned);
230
		}
231
	}
232

    
233
	/**
234
	 * Helper used to get the string from a text element.
235
	 *
236
	 * @param text
237
	 * @return the element text value
238
	 */
239
	protected final String getText(final XMLEvent text) {
240
		if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
241
			return "";
242

    
243
		final String data = text.asCharacters().getData();
244
		if (data != null && data.length() > MAX_FIELD_LENGTH) {
245
			return data.substring(0, MAX_FIELD_LENGTH);
246
		}
247

    
248
		return data;
249
	}
250

    
251
}
(3-3/3)