Project

General

Profile

1
package eu.dnetlib.functionality.index.solr.feed;
2

    
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.HashMap;
6
import java.util.Iterator;
7
import java.util.List;
8
import javax.xml.stream.*;
9
import javax.xml.stream.events.Namespace;
10
import javax.xml.stream.events.StartElement;
11
import javax.xml.stream.events.XMLEvent;
12

    
13
import com.google.common.collect.Lists;
14
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer.Mode;
15
import org.apache.solr.common.SolrInputDocument;
16
import org.apache.solr.common.StringUtils;
17

    
18
/**
19
 * Optimized version of the document parser, drop in replacement of InputDocumentFactory.
20
 *
21
 * <p>
22
 * Faster because:
23
 * </p>
24
 * <ul>
25
 * <li>Doesn't create a DOM for the full document</li>
26
 * <li>Doesn't execute xpaths agains the DOM</li>
27
 * <li>Quickly serialize the 'result' element directly in a string.</li>
28
 * <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
29
 * </ul>
30
 *
31
 * <p>
32
 * This class is fully reentrant and can be invoked in parallel.
33
 * </p>
34
 *
35
 * @author marko
36
 *
37
 */
38
public class StreamingInputDocumentFactory extends InputDocumentFactory {
39

    
40
	protected static final String DEFAULTDNETRESULT = "dnetResult";
41

    
42
	protected static final String TARGETFIELDS = "targetFields";
43

    
44
	protected static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
45

    
46
	protected static final String ROOT_ELEMENT = "indexRecord";
47

    
48
	protected static final int MAX_FIELD_LENGTH = 32765;
49

    
50
	protected ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
51

    
52
		@Override
53
		protected XMLInputFactory initialValue() {
54
			return XMLInputFactory.newInstance();
55
		}
56
	};
57

    
58
	protected ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
59

    
60
		@Override
61
		protected XMLOutputFactory initialValue() {
62
			return XMLOutputFactory.newInstance();
63
		}
64
	};
65

    
66
	protected ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
67

    
68
		@Override
69
		protected XMLEventFactory initialValue() {
70
			return XMLEventFactory.newInstance();
71
		}
72
	};
73

    
74
	/**
75
	 * {@inheritDoc}
76
	 *
77
	 * @see eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory#parseDocument(eu.dnetlib.functionality.index.solr.feed.IndexDocument,
78
	 * java.lang.String)
79
	 */
80
	@Override
81
	public SolrInputDocument parseDocument(final String version, final String inputDocument, final String dsId, final String resultName)
82
			throws XMLStreamException {
83
		return parseDocument(version, inputDocument, dsId, resultName, null);
84
	}
85

    
86
	/**
87
	 * {@inheritDoc}
88
	 *
89
	 * @see eu.dnetlib.functionality.index.solr.feed.InputDocumentFactory#parseDocument(eu.dnetlib.functionality.index.solr.feed.IndexDocument,
90
	 *      java.lang.String, com.google.common.base.Function)
91
	 */
92
	@Override
93
	public SolrInputDocument parseDocument(final String version,
94
			final String inputDocument,
95
			final String dsId,
96
			final String resultName,
97
			final ResultTransformer resultTransformer) {
98

    
99
		final StringWriter results = new StringWriter();
100
		final List<Namespace> nsList = Lists.newLinkedList();
101
		try {
102

    
103
			XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
104

    
105
			final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
106

    
107
			while (parser.hasNext()) {
108
				final XMLEvent event = parser.nextEvent();
109
				if ((event != null) && event.isStartElement()) {
110
					final String localName = event.asStartElement().getName().getLocalPart();
111

    
112
					if (ROOT_ELEMENT.equals(localName)) {
113
						nsList.addAll(getNamespaces(event));
114
					} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
115
						final XMLEvent text = parser.nextEvent();
116
						String recordId = getText(text);
117
						indexDocument.addField(INDEX_RECORD_ID, recordId);
118
					} else if (TARGETFIELDS.equals(localName)) {
119
						parseTargetFields(indexDocument, parser);
120
					} else if (resultName.equals(localName)) {
121
						if (resultTransformer == null || !(Mode.empty.equals(resultTransformer.getMode()))) {
122
							copyResult(indexDocument, results, parser, nsList, resultName, resultTransformer);
123
						}
124
					}
125
				}
126
			}
127

    
128
			if (version != null) {
129
				indexDocument.addField(DS_VERSION, version);
130
			}
131

    
132
			if (dsId != null) {
133
				indexDocument.addField(DS_ID, dsId);
134
			}
135

    
136
			if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
137
				indexDocument.clear();
138
				System.err.println("missing indexrecord id:\n" + inputDocument);
139
			}
140

    
141
			return indexDocument;
142
		} catch (XMLStreamException e) {
143
			return new SolrInputDocument();
144
		}
145
	}
146

    
147
	private List<Namespace> getNamespaces(final XMLEvent event) {
148
		final List<Namespace> res = Lists.newLinkedList();
149
		@SuppressWarnings("unchecked")
150
		Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
151
		while (nsIter.hasNext()) {
152
			Namespace ns = nsIter.next();
153
			res.add(ns);
154
		}
155
		return res;
156
	}
157

    
158
	/**
159
	 * Parse the targetFields block and add fields to the solr document.
160
	 *
161
	 * @param indexDocument
162
	 * @param parser
163
	 * @throws XMLStreamException
164
	 */
165
	protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
166

    
167
		boolean hasFields = false;
168

    
169
		while (parser.hasNext()) {
170
			final XMLEvent targetEvent = parser.nextEvent();
171
			if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
172
				break;
173
			}
174

    
175
			if (targetEvent.isStartElement()) {
176
				final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
177
				final XMLEvent text = parser.nextEvent();
178

    
179
				String data = getText(text);
180

    
181
				addField(indexDocument, fieldName, data);
182
				hasFields = true;
183
			}
184
		}
185

    
186
		if (!hasFields) {
187
			indexDocument.clear();
188
		}
189
	}
190

    
191
	/**
192
	 * Copy the /indexRecord/result element and children, preserving namespace declarations etc.
193
	 *
194
	 * @param indexDocument
195
	 * @param results
196
	 * @param parser
197
	 * @param nsList
198
	 * @throws XMLStreamException
199
	 */
200
	protected void copyResult(final SolrInputDocument indexDocument,
201
			final StringWriter results,
202
			final XMLEventReader parser,
203
			final List<Namespace> nsList,
204
			final String dnetResult,
205
			final ResultTransformer resultTransformer) throws XMLStreamException {
206
		final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
207

    
208
		for (Namespace ns : nsList) {
209
			eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
210
		}
211

    
212
		StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
213

    
214
		// new root record
215
		writer.add(newRecord);
216

    
217
		// copy the rest as it is
218
		while (parser.hasNext()) {
219
			final XMLEvent resultEvent = parser.nextEvent();
220

    
221
			// TODO: replace with depth tracking instead of close tag tracking.
222
			if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
223
				writer.add(eventFactory.get().createEndElement("", null, RESULT));
224
				break;
225
			}
226

    
227
			writer.add(resultEvent);
228
		}
229
		writer.close();
230

    
231
		if (resultTransformer != null) {
232
			indexDocument.addField(INDEX_RESULT, resultTransformer.apply(results.toString()));
233
		} else {
234
			indexDocument.addField(INDEX_RESULT, results.toString());
235
		}
236
	}
237

    
238
	/**
239
	 * Helper used to add a field to a solr doc. It avoids to add empy fields
240
	 *
241
	 * @param indexDocument
242
	 * @param field
243
	 * @param value
244
	 */
245
	private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
246
		String cleaned = value.trim();
247
		if (!cleaned.isEmpty()) {
248
			// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
249
			indexDocument.addField(field.toLowerCase(), cleaned);
250
		}
251
	}
252

    
253
	/**
254
	 * Helper used to get the string from a text element.
255
	 *
256
	 * @param text
257
	 * @return
258
	 */
259
	protected final String getText(final XMLEvent text) {
260
		if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
261
			return "";
262

    
263
		final String data = text.asCharacters().getData();
264
		if (data != null && data.length() > MAX_FIELD_LENGTH) {
265
			return data.substring(0, MAX_FIELD_LENGTH);
266
		}
267

    
268
		return data;
269
	}
270

    
271
}
(3-3/3)