001/*
002 * Copyright 2006-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.xml;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.NoSuchElementException;
024
025import javax.xml.namespace.QName;
026import javax.xml.stream.XMLEventReader;
027import javax.xml.stream.XMLInputFactory;
028import javax.xml.stream.XMLStreamException;
029import javax.xml.stream.events.EndElement;
030import javax.xml.stream.events.StartElement;
031import javax.xml.stream.events.XMLEvent;
032
033import org.apache.commons.logging.Log;
034import org.apache.commons.logging.LogFactory;
035import org.springframework.batch.item.NonTransientResourceException;
036import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream;
037import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
038import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader;
039import org.springframework.batch.item.xml.stax.FragmentEventReader;
040import org.springframework.beans.factory.InitializingBean;
041import org.springframework.core.io.Resource;
042import org.springframework.oxm.Unmarshaller;
043import org.springframework.util.Assert;
044import org.springframework.util.ClassUtils;
045import org.springframework.util.StringUtils;
046
047/**
048 * Item reader for reading XML input based on StAX.
049 * 
050 * It extracts fragments from the input XML document which correspond to records for processing. The fragments are
051 * wrapped with StartDocument and EndDocument events so that the fragments can be further processed like standalone XML
052 * documents.
053 * 
054 * The implementation is <b>not</b> thread-safe.
055 * 
056 * @author Robert Kasanicky
057 * @author Mahmoud Ben Hassine
058 */
059public class StaxEventItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
060ResourceAwareItemReaderItemStream<T>, InitializingBean {
061
062        private static final Log logger = LogFactory.getLog(StaxEventItemReader.class);
063
064        private FragmentEventReader fragmentReader;
065
066        private XMLEventReader eventReader;
067
068        private Unmarshaller unmarshaller;
069
070        private Resource resource;
071
072        private InputStream inputStream;
073
074        private List<QName> fragmentRootElementNames;
075
076        private boolean noInput;
077
078        private boolean strict = true;
079
080        private XMLInputFactory xmlInputFactory = StaxUtils.createXmlInputFactory();
081
082        public StaxEventItemReader() {
083                setName(ClassUtils.getShortName(StaxEventItemReader.class));
084        }
085
086        /**
087         * In strict mode the reader will throw an exception on
088         * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
089         * @param strict true by default
090         */
091        public void setStrict(boolean strict) {
092                this.strict = strict;
093        }
094
095        @Override
096        public void setResource(Resource resource) {
097                this.resource = resource;
098        }
099
100        /**
101         * @param unmarshaller maps xml fragments corresponding to records to objects
102         */
103        public void setUnmarshaller(Unmarshaller unmarshaller) {
104                this.unmarshaller = unmarshaller;
105        }
106
107        /**
108         * @param fragmentRootElementName name of the root element of the fragment
109         */
110        public void setFragmentRootElementName(String fragmentRootElementName) {
111                setFragmentRootElementNames(new String[] {fragmentRootElementName});
112        }
113
114        /**
115         * @param fragmentRootElementNames list of the names of the root element of the fragment
116         */
117        public void setFragmentRootElementNames(String[] fragmentRootElementNames) {
118                this.fragmentRootElementNames = new ArrayList<QName>();
119                for (String fragmentRootElementName : fragmentRootElementNames) {
120                        this.fragmentRootElementNames.add(parseFragmentRootElementName(fragmentRootElementName));
121                }
122        }
123
124        /**
125         * Set the {@link XMLInputFactory}.
126         * @param xmlInputFactory to use
127         */
128        public void setXmlInputFactory(XMLInputFactory xmlInputFactory) {
129                Assert.notNull(xmlInputFactory, "XMLInputFactory must not be null");
130                this.xmlInputFactory = xmlInputFactory;
131        }
132
133        /**
134         * Ensure that all required dependencies for the ItemReader to run are provided after all properties have been set.
135         * 
136         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
137         * @throws IllegalArgumentException if the Resource, FragmentDeserializer or FragmentRootElementName is null, or if
138         * the root element is empty.
139         * @throws IllegalStateException if the Resource does not exist.
140         */
141        @Override
142        public void afterPropertiesSet() throws Exception {
143                Assert.notNull(unmarshaller, "The Unmarshaller must not be null.");
144                Assert.notEmpty(fragmentRootElementNames, "The FragmentRootElementNames must not be empty");
145                for (QName fragmentRootElementName : fragmentRootElementNames) {
146                        Assert.hasText(fragmentRootElementName.getLocalPart(), "The FragmentRootElementNames must not contain empty elements");
147                }               
148        }
149
150        /**
151         * Responsible for moving the cursor before the StartElement of the fragment root.
152         * 
153         * This implementation simply looks for the next corresponding element, it does not care about element nesting. You
154         * will need to override this method to correctly handle composite fragments.
155         *
156         * @param reader the {@link XMLEventReader} to be used to find next fragment.
157         * 
158         * @return <code>true</code> if next fragment was found, <code>false</code> otherwise.
159         * 
160         * @throws NonTransientResourceException if the cursor could not be moved. This will be treated as fatal and
161         * subsequent calls to read will return null.
162         */
163        protected boolean moveCursorToNextFragment(XMLEventReader reader) throws NonTransientResourceException {
164                try {
165                        while (true) {
166                                while (reader.peek() != null && !reader.peek().isStartElement()) {
167                                        reader.nextEvent();
168                                }
169                                if (reader.peek() == null) {
170                                        return false;
171                                }
172                                QName startElementName = ((StartElement) reader.peek()).getName();
173                                if (isFragmentRootElementName(startElementName)) {
174                                        return true;
175                                }
176                                reader.nextEvent();
177
178                        }
179                }
180                catch (XMLStreamException e) {
181                        throw new NonTransientResourceException("Error while reading from event reader", e);
182                }
183        }
184
185        @Override
186        protected void doClose() throws Exception {
187                try {
188                        if (fragmentReader != null) {
189                                fragmentReader.close();
190                        }
191                        if (inputStream != null) {
192                                inputStream.close();
193                        }
194                }
195                finally {
196                        fragmentReader = null;
197                        inputStream = null;
198                }
199
200        }
201
202        @Override
203        protected void doOpen() throws Exception {
204                Assert.notNull(resource, "The Resource must not be null.");
205
206                noInput = true;
207                if (!resource.exists()) {
208                        if (strict) {
209                                throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode)");
210                        }
211                        logger.warn("Input resource does not exist " + resource.getDescription());
212                        return;
213                }
214                if (!resource.isReadable()) {
215                        if (strict) {
216                                throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode)");
217                        }
218                        logger.warn("Input resource is not readable " + resource.getDescription());
219                        return;
220                }
221
222                inputStream = resource.getInputStream();
223                eventReader = xmlInputFactory.createXMLEventReader(inputStream);
224                fragmentReader = new DefaultFragmentEventReader(eventReader);
225                noInput = false;
226
227        }
228
229        /**
230         * Move to next fragment and map it to item.
231         */
232        @Override
233        protected T doRead() throws IOException, XMLStreamException {
234
235                if (noInput) {
236                        return null;
237                }
238
239                T item = null;
240
241                boolean success = false;
242                try {
243                        success = moveCursorToNextFragment(fragmentReader);
244                }
245                catch (NonTransientResourceException e) {
246                        // Prevent caller from retrying indefinitely since this is fatal
247                        noInput = true;
248                        throw e;
249                }
250                if (success) {
251                        fragmentReader.markStartFragment();
252
253                        try {
254                                @SuppressWarnings("unchecked")
255                                T mappedFragment = (T) unmarshaller.unmarshal(StaxUtils.getSource(fragmentReader));
256                                item = mappedFragment;
257                        }
258                        finally {
259                                fragmentReader.markFragmentProcessed();
260                        }
261                }
262
263                return item;
264        }
265
266        /*
267         * jumpToItem is overridden because reading in and attempting to bind an entire fragment is unacceptable in a
268         * restart scenario, and may cause exceptions to be thrown that were already skipped in previous runs.
269         */
270        @Override
271        protected void jumpToItem(int itemIndex) throws Exception {
272                for (int i = 0; i < itemIndex; i++) {
273                        try {
274                                QName fragmentName = readToStartFragment();
275                                readToEndFragment(fragmentName);
276                        } catch (NoSuchElementException e) {
277                                if (itemIndex == (i + 1)) {
278                                        // we can presume a NoSuchElementException on the last item means the EOF was reached on the last run
279                                        return;
280                                } else {
281                                        // if NoSuchElementException occurs on an item other than the last one, this indicates a problem
282                                        throw e;
283                                }
284                        }
285                }
286        }
287
288        /*
289         * Read until the first StartElement tag that matches any of the provided fragmentRootElementNames. Because there may be any
290         * number of tags in between where the reader is now and the fragment start, this is done in a loop until the
291         * element type and name match.
292         */
293        private QName readToStartFragment() throws XMLStreamException {
294                while (true) {
295                        XMLEvent nextEvent = eventReader.nextEvent();
296                        if (nextEvent.isStartElement()
297                                        && isFragmentRootElementName(((StartElement) nextEvent).getName())) {
298                                return ((StartElement) nextEvent).getName();
299                        }
300                }
301        }
302
303        /*
304         * Read until the first EndElement tag that matches the provided fragmentRootElementName. Because there may be any
305         * number of tags in between where the reader is now and the fragment end tag, this is done in a loop until the
306         * element type and name match
307         */
308        private void readToEndFragment(QName fragmentRootElementName) throws XMLStreamException {
309                while (true) {
310                        XMLEvent nextEvent = eventReader.nextEvent();
311                        if (nextEvent.isEndElement()
312                                        && fragmentRootElementName.equals(((EndElement) nextEvent).getName())) {
313                               ourceLineNo">314                        }
315                }
316        }
317        
318        private boolean isFragmentRootElementName(QName name) {
319                for (QName fragmentRootElementName : fragmentRootElementNames) {
320                        if (fragmentRootElementName.getLocalPart().equals(name.getLocalPart())) {
321                                if (!StringUtils.hasText(fragmentRootElementName.getNamespaceURI())
322                                                || fragmentRootElementName.getNamespaceURI().equals(name.getNamespaceURI())) {                                  
323                                        return true;
324                                }
325                        }
326                }
327                return false;
328        }       
329        
330        private QName parseFragmentRootElementName(String fragmentRootElementName) {
331                String name = fragmentRootElementName;
332                String nameSpace = null;
333                if (fragmentRootElementName.contains("{")) {
334                        nameSpace = fragmentRootElementName.replaceAll("\\{(.*)\\}.*", "$1");
335                        name = fragmentRootElementName.replaceAll("\\{.*\\}(.*)", "$1");
336                }
337                return new QName(nameSpace, name, "");
338        }
339        
340}