001/* 002 * Copyright 2006-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.xml; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.NoSuchElementException; 024 025import javax.xml.namespace.QName; 026import javax.xml.stream.XMLEventReader; 027import javax.xml.stream.XMLInputFactory; 028import javax.xml.stream.XMLStreamException; 029import javax.xml.stream.events.EndElement; 030import javax.xml.stream.events.StartElement; 031import javax.xml.stream.events.XMLEvent; 032 033import org.apache.commons.logging.Log; 034import org.apache.commons.logging.LogFactory; 035import org.springframework.batch.item.NonTransientResourceException; 036import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream; 037import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; 038import org.springframework.batch.item.xml.stax.DefaultFragmentEventReader; 039import org.springframework.batch.item.xml.stax.FragmentEventReader; 040import org.springframework.beans.factory.InitializingBean; 041import org.springframework.core.io.Resource; 042import org.springframework.oxm.Unmarshaller; 043import org.springframework.util.Assert; 044import org.springframework.util.ClassUtils; 045import org.springframework.util.StringUtils; 046 047/** 048 * Item reader for reading XML input based on StAX. 049 * 050 * It extracts fragments from the input XML document which correspond to records for processing. The fragments are 051 * wrapped with StartDocument and EndDocument events so that the fragments can be further processed like standalone XML 052 * documents. 053 * 054 * The implementation is <b>not</b> thread-safe. 055 * 056 * @author Robert Kasanicky 057 * @author Mahmoud Ben Hassine 058 */ 059public class StaxEventItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements 060ResourceAwareItemReaderItemStream<T>, InitializingBean { 061 062 private static final Log logger = LogFactory.getLog(StaxEventItemReader.class); 063 064 private FragmentEventReader fragmentReader; 065 066 private XMLEventReader eventReader; 067 068 private Unmarshaller unmarshaller; 069 070 private Resource resource; 071 072 private InputStream inputStream; 073 074 private List<QName> fragmentRootElementNames; 075 076 private boolean noInput; 077 078 private boolean strict = true; 079 080 private XMLInputFactory xmlInputFactory = StaxUtils.createXmlInputFactory(); 081 082 public StaxEventItemReader() { 083 setName(ClassUtils.getShortName(StaxEventItemReader.class)); 084 } 085 086 /** 087 * In strict mode the reader will throw an exception on 088 * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist. 089 * @param strict true by default 090 */ 091 public void setStrict(boolean strict) { 092 this.strict = strict; 093 } 094 095 @Override 096 public void setResource(Resource resource) { 097 this.resource = resource; 098 } 099 100 /** 101 * @param unmarshaller maps xml fragments corresponding to records to objects 102 */ 103 public void setUnmarshaller(Unmarshaller unmarshaller) { 104 this.unmarshaller = unmarshaller; 105 } 106 107 /** 108 * @param fragmentRootElementName name of the root element of the fragment 109 */ 110 public void setFragmentRootElementName(String fragmentRootElementName) { 111 setFragmentRootElementNames(new String[] {fragmentRootElementName}); 112 } 113 114 /** 115 * @param fragmentRootElementNames list of the names of the root element of the fragment 116 */ 117 public void setFragmentRootElementNames(String[] fragmentRootElementNames) { 118 this.fragmentRootElementNames = new ArrayList<QName>(); 119 for (String fragmentRootElementName : fragmentRootElementNames) { 120 this.fragmentRootElementNames.add(parseFragmentRootElementName(fragmentRootElementName)); 121 } 122 } 123 124 /** 125 * Set the {@link XMLInputFactory}. 126 * @param xmlInputFactory to use 127 */ 128 public void setXmlInputFactory(XMLInputFactory xmlInputFactory) { 129 Assert.notNull(xmlInputFactory, "XMLInputFactory must not be null"); 130 this.xmlInputFactory = xmlInputFactory; 131 } 132 133 /** 134 * Ensure that all required dependencies for the ItemReader to run are provided after all properties have been set. 135 * 136 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 137 * @throws IllegalArgumentException if the Resource, FragmentDeserializer or FragmentRootElementName is null, or if 138 * the root element is empty. 139 * @throws IllegalStateException if the Resource does not exist. 140 */ 141 @Override 142 public void afterPropertiesSet() throws Exception { 143 Assert.notNull(unmarshaller, "The Unmarshaller must not be null."); 144 Assert.notEmpty(fragmentRootElementNames, "The FragmentRootElementNames must not be empty"); 145 for (QName fragmentRootElementName : fragmentRootElementNames) { 146 Assert.hasText(fragmentRootElementName.getLocalPart(), "The FragmentRootElementNames must not contain empty elements"); 147 } 148 } 149 150 /** 151 * Responsible for moving the cursor before the StartElement of the fragment root. 152 * 153 * This implementation simply looks for the next corresponding element, it does not care about element nesting. You 154 * will need to override this method to correctly handle composite fragments. 155 * 156 * @param reader the {@link XMLEventReader} to be used to find next fragment. 157 * 158 * @return <code>true</code> if next fragment was found, <code>false</code> otherwise. 159 * 160 * @throws NonTransientResourceException if the cursor could not be moved. This will be treated as fatal and 161 * subsequent calls to read will return null. 162 */ 163 protected boolean moveCursorToNextFragment(XMLEventReader reader) throws NonTransientResourceException { 164 try { 165 while (true) { 166 while (reader.peek() != null && !reader.peek().isStartElement()) { 167 reader.nextEvent(); 168 } 169 if (reader.peek() == null) { 170 return false; 171 } 172 QName startElementName = ((StartElement) reader.peek()).getName(); 173 if (isFragmentRootElementName(startElementName)) { 174 return true; 175 } 176 reader.nextEvent(); 177 178 } 179 } 180 catch (XMLStreamException e) { 181 throw new NonTransientResourceException("Error while reading from event reader", e); 182 } 183 } 184 185 @Override 186 protected void doClose() throws Exception { 187 try { 188 if (fragmentReader != null) { 189 fragmentReader.close(); 190 } 191 if (inputStream != null) { 192 inputStream.close(); 193 } 194 } 195 finally { 196 fragmentReader = null; 197 inputStream = null; 198 } 199 200 } 201 202 @Override 203 protected void doOpen() throws Exception { 204 Assert.notNull(resource, "The Resource must not be null."); 205 206 noInput = true; 207 if (!resource.exists()) { 208 if (strict) { 209 throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode)"); 210 } 211 logger.warn("Input resource does not exist " + resource.getDescription()); 212 return; 213 } 214 if (!resource.isReadable()) { 215 if (strict) { 216 throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode)"); 217 } 218 logger.warn("Input resource is not readable " + resource.getDescription()); 219 return; 220 } 221 222 inputStream = resource.getInputStream(); 223 eventReader = xmlInputFactory.createXMLEventReader(inputStream); 224 fragmentReader = new DefaultFragmentEventReader(eventReader); 225 noInput = false; 226 227 } 228 229 /** 230 * Move to next fragment and map it to item. 231 */ 232 @Override 233 protected T doRead() throws IOException, XMLStreamException { 234 235 if (noInput) { 236 return null; 237 } 238 239 T item = null; 240 241 boolean success = false; 242 try { 243 success = moveCursorToNextFragment(fragmentReader); 244 } 245 catch (NonTransientResourceException e) { 246 // Prevent caller from retrying indefinitely since this is fatal 247 noInput = true; 248 throw e; 249 } 250 if (success) { 251 fragmentReader.markStartFragment(); 252 253 try { 254 @SuppressWarnings("unchecked") 255 T mappedFragment = (T) unmarshaller.unmarshal(StaxUtils.getSource(fragmentReader)); 256 item = mappedFragment; 257 } 258 finally { 259 fragmentReader.markFragmentProcessed(); 260 } 261 } 262 263 return item; 264 } 265 266 /* 267 * jumpToItem is overridden because reading in and attempting to bind an entire fragment is unacceptable in a 268 * restart scenario, and may cause exceptions to be thrown that were already skipped in previous runs. 269 */ 270 @Override 271 protected void jumpToItem(int itemIndex) throws Exception { 272 for (int i = 0; i < itemIndex; i++) { 273 try { 274 QName fragmentName = readToStartFragment(); 275 readToEndFragment(fragmentName); 276 } catch (NoSuchElementException e) { 277 if (itemIndex == (i + 1)) { 278 // we can presume a NoSuchElementException on the last item means the EOF was reached on the last run 279 return; 280 } else { 281 // if NoSuchElementException occurs on an item other than the last one, this indicates a problem 282 throw e; 283 } 284 } 285 } 286 } 287 288 /* 289 * Read until the first StartElement tag that matches any of the provided fragmentRootElementNames. Because there may be any 290 * number of tags in between where the reader is now and the fragment start, this is done in a loop until the 291 * element type and name match. 292 */ 293 private QName readToStartFragment() throws XMLStreamException { 294 while (true) { 295 XMLEvent nextEvent = eventReader.nextEvent(); 296 if (nextEvent.isStartElement() 297 && isFragmentRootElementName(((StartElement) nextEvent).getName())) { 298 return ((StartElement) nextEvent).getName(); 299 } 300 } 301 } 302 303 /* 304 * Read until the first EndElement tag that matches the provided fragmentRootElementName. Because there may be any 305 * number of tags in between where the reader is now and the fragment end tag, this is done in a loop until the 306 * element type and name match 307 */ 308 private void readToEndFragment(QName fragmentRootElementName) throws XMLStreamException { 309 while (true) { 310 XMLEvent nextEvent = eventReader.nextEvent(); 311 if (nextEvent.isEndElement() 312 && fragmentRootElementName.equals(((EndElement) nextEvent).getName())) { 313 ourceLineNo">314 } 315 } 316 } 317 318 private boolean isFragmentRootElementName(QName name) { 319 for (QName fragmentRootElementName : fragmentRootElementNames) { 320 if (fragmentRootElementName.getLocalPart().equals(name.getLocalPart())) { 321 if (!StringUtils.hasText(fragmentRootElementName.getNamespaceURI()) 322 || fragmentRootElementName.getNamespaceURI().equals(name.getNamespaceURI())) { 323 return true; 324 } 325 } 326 } 327 return false; 328 } 329 330 private QName parseFragmentRootElementName(String fragmentRootElementName) { 331 String name = fragmentRootElementName; 332 String nameSpace = null; 333 if (fragmentRootElementName.contains("{")) { 334 nameSpace = fragmentRootElementName.replaceAll("\\{(.*)\\}.*", "$1"); 335 name = fragmentRootElementName.replaceAll("\\{.*\\}(.*)", "$1"); 336 } 337 return new QName(nameSpace, name, ""); 338 } 339 340}