Project

General

Profile

1
package eu.dnetlib.functionality.index.solr.feed;
2

    
3
import java.io.StringReader;
4
import java.io.StringWriter;
5
import java.util.HashMap;
6
import java.util.Iterator;
7
import java.util.List;
8
import javax.xml.stream.*;
9
import javax.xml.stream.events.Namespace;
10
import javax.xml.stream.events.StartElement;
11
import javax.xml.stream.events.XMLEvent;
12

    
13
import com.google.common.collect.Lists;
14
import eu.dnetlib.functionality.index.solr.feed.ResultTransformer.Mode;
15
import org.apache.solr.common.SolrInputDocument;
16

    
17
/**
18
 * Optimized version of the document parser, drop in replacement of InputDocumentFactory.
19
 *
20
 * <p>
21
 * Faster because:
22
 * </p>
23
 * <ul>
24
 * <li>Doesn't create a DOM for the full document</li>
25
 * <li>Doesn't execute xpaths agains the DOM</li>
26
 * <li>Quickly serialize the 'result' element directly in a string.</li>
27
 * <li>Uses less memory: less pressure on GC and allows more threads to process this in parallel</li>
28
 * </ul>
29
 *
30
 * <p>
31
 * This class is fully reentrant and can be invoked in parallel.
32
 * </p>
33
 *
34
 * @author marko
35
 *
36
 */
37
public class StreamingInputDocumentFactory extends InputDocumentFactory {
38

    
39
	protected static final String DEFAULTDNETRESULT = "dnetResult";
40

    
41
	protected static final String TARGETFIELDS = "targetFields";
42

    
43
	protected static final String INDEX_RECORD_ID_ELEMENT = "indexRecordIdentifier";
44

    
45
	protected static final String ROOT_ELEMENT = "indexRecord";
46

    
47
	protected static final int MAX_FIELD_LENGTH = 25000;
48

    
49
	protected ThreadLocal<XMLInputFactory> inputFactory = new ThreadLocal<XMLInputFactory>() {
50

    
51
		@Override
52
		protected XMLInputFactory initialValue() {
53
			return XMLInputFactory.newInstance();
54
		}
55
	};
56

    
57
	protected ThreadLocal<XMLOutputFactory> outputFactory = new ThreadLocal<XMLOutputFactory>() {
58

    
59
		@Override
60
		protected XMLOutputFactory initialValue() {
61
			return XMLOutputFactory.newInstance();
62
		}
63
	};
64

    
65
	protected ThreadLocal<XMLEventFactory> eventFactory = new ThreadLocal<XMLEventFactory>() {
66

    
67
		@Override
68
		protected XMLEventFactory initialValue() {
69
			return XMLEventFactory.newInstance();
70
		}
71
	};
72

    
73
	@Override
74
	public SolrInputDocument parseDocument(final String version, final String inputDocument, final String dsId, final String resultName)
75
			throws XMLStreamException {
76
		return parseDocument(version, inputDocument, dsId, resultName, null);
77
	}
78

    
79
	@Override
80
	public SolrInputDocument parseDocument(final String version,
81
			final String inputDocument,
82
			final String dsId,
83
			final String resultName,
84
			final ResultTransformer resultTransformer) {
85

    
86
		final StringWriter results = new StringWriter();
87
		final List<Namespace> nsList = Lists.newLinkedList();
88
		try {
89

    
90
			XMLEventReader parser = inputFactory.get().createXMLEventReader(new StringReader(inputDocument));
91

    
92
			final SolrInputDocument indexDocument = new SolrInputDocument(new HashMap<>());
93

    
94
			while (parser.hasNext()) {
95
				final XMLEvent event = parser.nextEvent();
96
				if ((event != null) && event.isStartElement()) {
97
					final String localName = event.asStartElement().getName().getLocalPart();
98

    
99
					if (ROOT_ELEMENT.equals(localName)) {
100
						nsList.addAll(getNamespaces(event));
101
					} else if (INDEX_RECORD_ID_ELEMENT.equals(localName)) {
102
						final XMLEvent text = parser.nextEvent();
103
						String recordId = getText(text);
104
						indexDocument.addField(INDEX_RECORD_ID, recordId);
105
					} else if (TARGETFIELDS.equals(localName)) {
106
						parseTargetFields(indexDocument, parser);
107
					} else if (resultName.equals(localName)) {
108
						if (resultTransformer == null || !(Mode.empty.equals(resultTransformer.getMode()))) {
109
							copyResult(indexDocument, results, parser, nsList, resultName, resultTransformer);
110
						}
111
					}
112
				}
113
			}
114

    
115
			if (version != null) {
116
				indexDocument.addField(DS_VERSION, version);
117
			}
118

    
119
			if (dsId != null) {
120
				indexDocument.addField(DS_ID, dsId);
121
			}
122

    
123
			if (!indexDocument.containsKey(INDEX_RECORD_ID)) {
124
				indexDocument.clear();
125
				System.err.println("missing indexrecord id:\n" + inputDocument);
126
			}
127

    
128
			return indexDocument;
129
		} catch (XMLStreamException e) {
130
			return new SolrInputDocument();
131
		}
132
	}
133

    
134
	private List<Namespace> getNamespaces(final XMLEvent event) {
135
		final List<Namespace> res = Lists.newLinkedList();
136
		@SuppressWarnings("unchecked")
137
		Iterator<Namespace> nsIter = event.asStartElement().getNamespaces();
138
		while (nsIter.hasNext()) {
139
			Namespace ns = nsIter.next();
140
			res.add(ns);
141
		}
142
		return res;
143
	}
144

    
145
	/**
146
	 * Parse the targetFields block and add fields to the solr document.
147
	 *
148
	 * @param indexDocument
149
	 * @param parser
150
	 * @throws XMLStreamException
151
	 */
152
	protected void parseTargetFields(final SolrInputDocument indexDocument, final XMLEventReader parser) throws XMLStreamException {
153

    
154
		boolean hasFields = false;
155

    
156
		while (parser.hasNext()) {
157
			final XMLEvent targetEvent = parser.nextEvent();
158
			if (targetEvent.isEndElement() && targetEvent.asEndElement().getName().getLocalPart().equals(TARGETFIELDS)) {
159
				break;
160
			}
161

    
162
			if (targetEvent.isStartElement()) {
163
				final String fieldName = targetEvent.asStartElement().getName().getLocalPart();
164
				final XMLEvent text = parser.nextEvent();
165

    
166
				String data = getText(text);
167

    
168
				addField(indexDocument, fieldName, data);
169
				hasFields = true;
170
			}
171
		}
172

    
173
		if (!hasFields) {
174
			indexDocument.clear();
175
		}
176
	}
177

    
178
	/**
179
	 * Copy the /indexRecord/result element and children, preserving namespace declarations etc.
180
	 *
181
	 * @param indexDocument
182
	 * @param results
183
	 * @param parser
184
	 * @param nsList
185
	 * @throws XMLStreamException
186
	 */
187
	protected void copyResult(final SolrInputDocument indexDocument,
188
			final StringWriter results,
189
			final XMLEventReader parser,
190
			final List<Namespace> nsList,
191
			final String dnetResult,
192
			final ResultTransformer resultTransformer) throws XMLStreamException {
193
		final XMLEventWriter writer = outputFactory.get().createXMLEventWriter(results);
194

    
195
		for (Namespace ns : nsList) {
196
			eventFactory.get().createNamespace(ns.getPrefix(), ns.getNamespaceURI());
197
		}
198

    
199
		StartElement newRecord = eventFactory.get().createStartElement("", null, RESULT, null, nsList.iterator());
200

    
201
		// new root record
202
		writer.add(newRecord);
203

    
204
		// copy the rest as it is
205
		while (parser.hasNext()) {
206
			final XMLEvent resultEvent = parser.nextEvent();
207

    
208
			// TODO: replace with depth tracking instead of close tag tracking.
209
			if (resultEvent.isEndElement() && resultEvent.asEndElement().getName().getLocalPart().equals(dnetResult)) {
210
				writer.add(eventFactory.get().createEndElement("", null, RESULT));
211
				break;
212
			}
213

    
214
			writer.add(resultEvent);
215
		}
216
		writer.close();
217

    
218
		if (resultTransformer != null) {
219
			indexDocument.addField(INDEX_RESULT, resultTransformer.apply(results.toString()));
220
		} else {
221
			indexDocument.addField(INDEX_RESULT, results.toString());
222
		}
223
	}
224

    
225
	/**
226
	 * Helper used to add a field to a solr doc. It avoids to add empy fields
227
	 *
228
	 * @param indexDocument
229
	 * @param field
230
	 * @param value
231
	 */
232
	private final void addField(final SolrInputDocument indexDocument, final String field, final String value) {
233
		String cleaned = value.trim();
234
		if (!cleaned.isEmpty()) {
235
			// log.info("\n\n adding field " + field.toLowerCase() + " value: " + cleaned + "\n");
236
			indexDocument.addField(field.toLowerCase(), cleaned);
237
		}
238
	}
239

    
240
	/**
241
	 * Helper used to get the string from a text element.
242
	 *
243
	 * @param text
244
	 * @return the element text value
245
	 */
246
	protected final String getText(final XMLEvent text) {
247
		if (text.isEndElement()) // log.warn("skipping because isEndOfElement " + text.asEndElement().getName().getLocalPart());
248
			return "";
249

    
250
		final String data = text.asCharacters().getData();
251
		if (data != null && data.length() > MAX_FIELD_LENGTH) {
252
			return data.substring(0, MAX_FIELD_LENGTH);
253
		}
254

    
255
		return data;
256
	}
257

    
258
}
(3-3/3)