Project

General

Profile

1
package eu.dnetlib.index.solr.feed;
2

    
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.Iterator;
6
import java.util.LinkedList;
7
import java.util.List;
8
import javax.xml.stream.*;
9
import javax.xml.stream.events.Namespace;
10
import javax.xml.stream.events.StartElement;
11
import javax.xml.stream.events.XMLEvent;
12

    
13
import eu.dnetlib.index.solr.feed.ResultTransformer.Mode;
14
import org.apache.solr.common.SolrInputDocument;
15

    
16
/**
17
 * Optimized version of the document parser, drop in replacement of InputDocumentFactory.
18
 * <p>
19
 * <p>
20
 * Faster because:
21
 * </p>
22
 * <ul>
23
 * <li>Doesn't create a DOM for the full document</li>
24
 * <li>Doesn't execute xpaths agains the DOM</li>
25
 * <li>Quickly serialize the 'result' element directly in a string.</li>
26
 * <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
27
 * </ul>
28
 * <p>
29
 * <p>
30
 * This class is fully reentrant and can be invoked in parallel.
31
 * </p>
32
 *
33
 * @author marko
34
 */
35

    
36
public class StreamingInputDocumentFactory extends InputDocumentFactory {
37

    
38
	protected static final String DEFAULTDNETRESULT = "dnetResult";
39

    
40
	protected static final String TARGETFIELDS = "targetFields";
41

    
42
	protected static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
43

    
44
	protected static final String ROOT_ELEMENT = "indexRecord";
45

    
46
	protected ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
47

    
48
		@Override
49
		protected XMLInputFactory initialValue() {
50
			return XMLInputFactory.newInstance();
51
		}
52
	};
53

    
54
	protected ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
55

    
56
		@Override
57
		protected XMLOutputFactory initialValue() {
58
			return XMLOutputFactory.newInstance();
59
		}
60
	};
61

    
62
	protected ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
63

    
64
		@Override
65
		protected XMLEventFactory initialValue() {
66
			return XMLEventFactory.newInstance();
67
		}
68
	};
69

    
70
	/**
71
	 * {@inheritDoc}
72
	 */
73
	@Override
74
	public SolrInputDocument parseDocument(final String version, final String inputDocument, final String dsId, final String resultName)
75
			throws XMLStreamException {
76
		return parseDocument(version, inputDocument, dsId, resultName, null);
77
	}
78

    
79
	/**
80
	 * {@inheritDoc}
81
	 * <p>
82
	 * String, com.google.common.base.Function)
83
	 */
84
	@Override
85
	public SolrInputDocument parseDocument(final String version,
86
			final String inputDocument,
87
			final String dsId,
88
			final String resultName,
89
			final ResultTransformer resultTransformer) {
90

    
91
		final StringWriter results = new StringWriter();
92
		final List<Namespace> nsList = new LinkedList<>();
93
		try {
94

    
95
			XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
96

    
97
			final SolrInputDocument indexDocument = new SolrInputDocument();
98

    
99
			while (parser.hasNext()) {
100
				final XMLEvent event = parser.nextEvent();
101
				if ((event != null) && event.isStartElement()) {
102
					final String localName = event.asStartElement().getName().getLocalPart();
103

    
104
					if (ROOT_ELEMENT.equals(localName)) {
105
						nsList.addAll(getNamespaces(event));
106
					} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
107
						final XMLEvent text = parser.nextEvent();
108
						String recordId = getText(text);
109
						indexDocument.addField(INDEX_RECORD_ID, recordId);
110
					} else if (TARGETFIELDS.equals(localName)) {
111
						parseTargetFields(indexDocument, parser);
112
					} else if (resultName.equals(localName)) {
113
						if (resultTransformer == null || !(Mode.empty.equals(resultTransformer.getMode()))) {
114
							copyResult(indexDocument, results, parser, nsList, resultName, resultTransformer);
115
						}
116
					}
117
				}
118
			}
119

    
120
			if (version != null) {
121
				indexDocument.addField(DS_VERSION, version);
122
			}
123

    
124
			if (dsId != null) {
125
				indexDocument.addField(DS_ID, dsId);
126
			}
127

    
128
			if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
129
				indexDocument.clear();
130
				System.err.println("missing indexrecord id:\n" + inputDocument);
131
			}
132

    
133
			return indexDocument;
134
		} catch (XMLStreamException e) {
135
			return new SolrInputDocument();
136
		}
137
	}
138

    
139
	private List<Namespace> getNamespaces(final XMLEvent event) {
140
		final List<Namespace> res = new LinkedList<>();
141
		@SuppressWarnings("unchecked")
142
		Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
143
		while (nsIter.hasNext()) {
144
			Namespace ns = nsIter.next();
145
			res.add(ns);
146
		}
147
		return res;
148
	}
149

    
150
	/**
151
	 * Parse the targetFields block and add fields to the solr document.
152
	 *
153
	 * @param indexDocument
154
	 * @param parser
155
	 * @throws XMLStreamException
156
	 */
157
	protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
158

    
159
		boolean hasFields = false;
160

    
161
		while (parser.hasNext()) {
162
			final XMLEvent targetEvent = parser.nextEvent();
163
			if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
164
				break;
165
			}
166

    
167
			if (targetEvent.isStartElement()) {
168
				final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
169
				final XMLEvent text = parser.nextEvent();
170

    
171
				String data = getText(text);
172

    
173
				addField(indexDocument, fieldName, data);
174
				hasFields = true;
175
			}
176
		}
177

    
178
		if (!hasFields) {
179
			indexDocument.clear();
180
		}
181
	}
182

    
183
	/**
184
	 * Copy the /indexRecord/result element and children, preserving namespace declarations etc.
185
	 *
186
	 * @param indexDocument
187
	 * @param results
188
	 * @param parser
189
	 * @param nsList
190
	 * @throws XMLStreamException
191
	 */
192
	protected void copyResult(final SolrInputDocument indexDocument,
193
			final StringWriter results,
194
			final XMLEventReader parser,
195
			final List<Namespace> nsList,
196
			final String dnetResult,
197
			final ResultTransformer resultTransformer) throws XMLStreamException {
198
		final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
199

    
200
		for (Namespace ns : nsList) {
201
			eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
202
		}
203

    
204
		StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
205

    
206
		// new root record
207
		writer.add(newRecord);
208

    
209
		// copy the rest as it is
210
		while (parser.hasNext()) {
211
			final XMLEvent resultEvent = parser.nextEvent();
212

    
213
			// TODO: replace with depth tracking instead of close tag tracking.
214
			if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
215
				writer.add(eventFactory.get().createEndElement("", null, RESULT));
216
				break;
217
			}
218

    
219
			writer.add(resultEvent);
220
		}
221
		writer.close();
222

    
223
		if (resultTransformer != null) {
224
			indexDocument.addField(INDEX_RESULT, resultTransformer.apply(results.toString()));
225
		} else {
226
			indexDocument.addField(INDEX_RESULT, results.toString());
227
		}
228
	}
229

    
230
	/**
231
	 * Helper used to add a field to a solr doc. It avoids to add empy fields
232
	 *
233
	 * @param indexDocument
234
	 * @param field
235
	 * @param value
236
	 */
237
	private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
238
		String cleaned = value.trim();
239
		if (!cleaned.isEmpty()) {
240
			// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
241
			indexDocument.addField(field.toLowerCase(), cleaned);
242
		}
243
	}
244

    
245
	/**
246
	 * Helper used to get the string from a text element.
247
	 *
248
	 * @param text
249
	 * @return
250
	 */
251
	protected final String getText(final XMLEvent text) {
252
		if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
253
			return "";
254

    
255
		return text.asCharacters().getData();
256
	}
257

    
258
}
(4-4/4)