Project

General

Profile

1
package eu.dnetlib.index.solr.feed;
2

    
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.HashMap;
6
import java.util.Iterator;
7
import java.util.List;
8

    
9
import javax.xml.stream.XMLEventFactory;
10
import javax.xml.stream.XMLEventReader;
11
import javax.xml.stream.XMLEventWriter;
12
import javax.xml.stream.XMLInputFactory;
13
import javax.xml.stream.XMLOutputFactory;
14
import javax.xml.stream.XMLStreamException;
15
import javax.xml.stream.events.Namespace;
16
import javax.xml.stream.events.StartElement;
17
import javax.xml.stream.events.XMLEvent;
18

    
19
import org.apache.solr.common.SolrInputDocument;
20

    
21
import com.google.common.collect.Lists;
22

    
23
import eu.dnetlib.index.solr.feed.ResultTransformer.Mode;
24

    
25
/**
26
 * Optimized version of the document parser, drop in replacement of InputDocumentFactory.
27
 *
28
 * <p>
29
 * Faster because:
30
 * </p>
31
 * <ul>
32
 * <li>Doesn't create a DOM for the full document</li>
33
 * <li>Doesn't execute xpaths agains the DOM</li>
34
 * <li>Quickly serialize the 'result' element directly in a string.</li>
35
 * <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
36
 * </ul>
37
 *
38
 * <p>
39
 * This class is fully reentrant and can be invoked in parallel.
40
 * </p>
41
 *
42
 * @author marko
43
 *
44
 */
45
public class StreamingInputDocumentFactory extends InputDocumentFactory {
46

    
47
	protected static final String DEFAULTDNETRESULT = "dnetResult";
48

    
49
	protected static final String TARGETFIELDS = "targetFields";
50

    
51
	protected static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
52

    
53
	protected static final String ROOT_ELEMENT = "indexRecord";
54

    
55
	protected static final int MAX_FIELD_LENGTH = 25000;
56

    
57
	protected ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
58

    
59
		@Override
60
		protected XMLInputFactory initialValue() {
61
			return XMLInputFactory.newInstance();
62
		}
63
	};
64

    
65
	protected ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
66

    
67
		@Override
68
		protected XMLOutputFactory initialValue() {
69
			return XMLOutputFactory.newInstance();
70
		}
71
	};
72

    
73
	protected ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
74

    
75
		@Override
76
		protected XMLEventFactory initialValue() {
77
			return XMLEventFactory.newInstance();
78
		}
79
	};
80

    
81
	@Override
82
	public SolrInputDocument parseDocument(final String version, final String inputDocument, final String dsId, final String resultName)
83
			throws XMLStreamException {
84
		return parseDocument(version, inputDocument, dsId, resultName, null);
85
	}
86

    
87
	@Override
88
	public SolrInputDocument parseDocument(final String version,
89
			final String inputDocument,
90
			final String dsId,
91
			final String resultName,
92
			final ResultTransformer resultTransformer) {
93

    
94
		final StringWriter results = new StringWriter();
95
		final List<Namespace> nsList = Lists.newLinkedList();
96
		try {
97

    
98
			XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
99

    
100
			final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
101

    
102
			while (parser.hasNext()) {
103
				final XMLEvent event = parser.nextEvent();
104
				if ((event != null) && event.isStartElement()) {
105
					final String localName = event.asStartElement().getName().getLocalPart();
106

    
107
					if (ROOT_ELEMENT.equals(localName)) {
108
						nsList.addAll(getNamespaces(event));
109
					} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
110
						final XMLEvent text = parser.nextEvent();
111
						String recordId = getText(text);
112
						indexDocument.addField(INDEX_RECORD_ID, recordId);
113
					} else if (TARGETFIELDS.equals(localName)) {
114
						parseTargetFields(indexDocument, parser);
115
					} else if (resultName.equals(localName)) {
116
						if (resultTransformer == null || !(Mode.empty.equals(resultTransformer.getMode()))) {
117
							copyResult(indexDocument, results, parser, nsList, resultName, resultTransformer);
118
						}
119
					}
120
				}
121
			}
122

    
123
			if (version != null) {
124
				indexDocument.addField(DS_VERSION, version);
125
			}
126

    
127
			if (dsId != null) {
128
				indexDocument.addField(DS_ID, dsId);
129
			}
130

    
131
			if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
132
				indexDocument.clear();
133
				System.err.println("missing indexrecord id:\n" + inputDocument);
134
			}
135

    
136
			return indexDocument;
137
		} catch (XMLStreamException e) {
138
			return new SolrInputDocument();
139
		}
140
	}
141

    
142
	private List<Namespace> getNamespaces(final XMLEvent event) {
143
		final List<Namespace> res = Lists.newLinkedList();
144
		@SuppressWarnings("unchecked")
145
		Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
146
		while (nsIter.hasNext()) {
147
			Namespace ns = nsIter.next();
148
			res.add(ns);
149
		}
150
		return res;
151
	}
152

    
153
	/**
154
	 * Parse the targetFields block and add fields to the solr document.
155
	 *
156
	 * @param indexDocument
157
	 * @param parser
158
	 * @throws XMLStreamException
159
	 */
160
	protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
161

    
162
		boolean hasFields = false;
163

    
164
		while (parser.hasNext()) {
165
			final XMLEvent targetEvent = parser.nextEvent();
166
			if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
167
				break;
168
			}
169

    
170
			if (targetEvent.isStartElement()) {
171
				final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
172
				final XMLEvent text = parser.nextEvent();
173

    
174
				String data = getText(text);
175

    
176
				addField(indexDocument, fieldName, data);
177
				hasFields = true;
178
			}
179
		}
180

    
181
		if (!hasFields) {
182
			indexDocument.clear();
183
		}
184
	}
185

    
186
	/**
187
	 * Copy the /indexRecord/result element and children, preserving namespace declarations etc.
188
	 *
189
	 * @param indexDocument
190
	 * @param results
191
	 * @param parser
192
	 * @param nsList
193
	 * @throws XMLStreamException
194
	 */
195
	protected void copyResult(final SolrInputDocument indexDocument,
196
			final StringWriter results,
197
			final XMLEventReader parser,
198
			final List<Namespace> nsList,
199
			final String dnetResult,
200
			final ResultTransformer resultTransformer) throws XMLStreamException {
201
		final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
202

    
203
		for (Namespace ns : nsList) {
204
			eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
205
		}
206

    
207
		StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
208

    
209
		// new root record
210
		writer.add(newRecord);
211

    
212
		// copy the rest as it is
213
		while (parser.hasNext()) {
214
			final XMLEvent resultEvent = parser.nextEvent();
215

    
216
			// TODO: replace with depth tracking instead of close tag tracking.
217
			if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
218
				writer.add(eventFactory.get().createEndElement("", null, RESULT));
219
				break;
220
			}
221

    
222
			writer.add(resultEvent);
223
		}
224
		writer.close();
225

    
226
		if (resultTransformer != null) {
227
			indexDocument.addField(INDEX_RESULT, resultTransformer.apply(results.toString()));
228
		} else {
229
			indexDocument.addField(INDEX_RESULT, results.toString());
230
		}
231
	}
232

    
233
	/**
234
	 * Helper used to add a field to a solr doc. It avoids to add empy fields
235
	 *
236
	 * @param indexDocument
237
	 * @param field
238
	 * @param value
239
	 */
240
	private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
241
		String cleaned = value.trim();
242
		if (!cleaned.isEmpty()) {
243
			// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
244
			indexDocument.addField(field.toLowerCase(), cleaned);
245
		}
246
	}
247

    
248
	/**
249
	 * Helper used to get the string from a text element.
250
	 *
251
	 * @param text
252
	 * @return the
253
	 */
254
	protected final String getText(final XMLEvent text) {
255
		if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
256
			return "";
257

    
258
		final String data = text.asCharacters().getData();
259
		if (data != null && data.length() > MAX_FIELD_LENGTH) {
260
			return data.substring(0, MAX_FIELD_LENGTH);
261
		}
262

    
263
		return data;
264
	}
265

    
266
}
(4-4/4)