001/* 002 * Copyright 2006-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.xml; 018 019import java.io.BufferedWriter; 020import java.io.File; 021import java.io.FileOutputStream; 022import java.io.IOException; 023import java.io.OutputStreamWriter; 024import java.io.UnsupportedEncodingException; 025import java.io.Writer; 026import java.nio.channels.FileChannel; 027import java.util.Collections; 028import java.util.List; 029import java.util.Map; 030import javax.xml.namespace.QName; 031import javax.xml.stream.FactoryConfigurationError; 032import javax.xml.stream.XMLEventFactory; 033import javax.xml.stream.XMLEventWriter; 034import javax.xml.stream.XMLOutputFactory; 035import javax.xml.stream.XMLStreamException; 036import javax.xml.transform.Result; 037 038import org.apache.commons.logging.Log; 039import org.apache.commons.logging.LogFactory; 040 041import org.springframework.batch.item.ExecutionContext; 042import org.springframework.batch.item.ItemStreamException; 043import org.springframework.batch.item.ItemWriter; 044import org.springframework.batch.item.WriteFailedException; 045import org.springframework.batch.item.WriterNotOpenException; 046import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream; 047import org.springframework.batch.item.support.AbstractItemStreamItemWriter; 048import org.springframework.batch.item.util.FileUtils; 049import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter; 050import org.springframework.batch.item.xml.stax.UnclosedElementCollectingEventWriter; 051import org.springframework.batch.item.xml.stax.UnopenedElementClosingEventWriter; 052import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter; 053import org.springframework.beans.factory.InitializingBean; 054import org.springframework.core.io.Resource; 055import org.springframework.dao.DataAccessResourceFailureException; 056import org.springframework.oxm.Marshaller; 057import org.springframework.oxm.XmlMappingException; 058import org.springframework.util.Assert; 059import org.springframework.util.ClassUtils; 060import org.springframework.util.CollectionUtils; 061import org.springframework.util.StringUtils; 062 063/** 064 * An implementation of {@link ItemWriter} which uses StAX and 065 * {@link Marshaller} for serializing object to XML. 066 * 067 * This item writer also provides restart, statistics and transaction features 068 * by implementing corresponding interfaces. 069 * 070 * The implementation is <b>not</b> thread-safe. 071 * 072 * @author Peter Zozom 073 * @author Robert Kasanicky 074 * @author Michael Minella 075 * 076 */ 077public class StaxEventItemWriter<T> extends AbstractItemStreamItemWriter<T> implements 078ResourceAwareItemWriterItemStream<T>, InitializingBean { 079 080 private static final Log log = LogFactory.getLog(StaxEventItemWriter.class); 081 082 // default encoding 083 public static final String DEFAULT_ENCODING = "UTF-8"; 084 085 // default encoding 086 public static final String DEFAULT_XML_VERSION = "1.0"; 087 088 // default root tag name 089 public static final String DEFAULT_ROOT_TAG_NAME = "root"; 090 091 // restart data property name 092 private static final String RESTART_DATA_NAME = "position"; 093 094 // unclosed header callback elements property name 095 private static final String UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME = "unclosedHeaderCallbackElements"; 096 097 // restart data property name 098 private static final String WRITE_STATISTICS_NAME = "record.count"; 099 100 // file system resource 101 private Resource resource; 102 103 // xml marshaller 104 private Marshaller marshaller; 105 106 // encoding to be used while reading from the resource 107 private String encoding = DEFAULT_ENCODING; 108 109 // XML version 110 private String version = DEFAULT_XML_VERSION; 111 112 // name of the root tag 113 private String rootTagName = DEFAULT_ROOT_TAG_NAME; 114 115 // namespace prefix of the root tag 116 private String rootTagNamespacePrefix = ""; 117 118 // namespace of the root tag 119 private String rootTagNamespace = ""; 120 121 // root element attributes 122 private Map<String, String> rootElementAttributes = null; 123 124 // TRUE means, that output file will be overwritten if exists - default is 125 // TRUE 126 private boolean overwriteOutput = true; 127 128 // file channel 129 private FileChannel channel; 130 131 // wrapper for XML event writer that swallows StartDocument and EndDocument 132 // events 133 private XMLEventWriter eventWriter; 134 135 // XML event writer 136 private XMLEventWriter delegateEventWriter; 137 138 // current count of processed records 139 private long currentRecordCount = 0; 140 141 private boolean saveState = true; 142 143 private StaxWriterCallback headerCallback; 144 145 private StaxWriterCallback footerCallback; 146 147 private Writer bufferedWriter; 148 149 private boolean transactional = true; 150 151 private boolean forceSync; 152 153 private boolean shouldDeleteIfEmpty = false; 154 155 private boolean restarted = false; 156 157 private boolean initialized = false; 158 159 // List holding the QName of elements that were opened in the header callback, but not closed 160 private List<QName> unclosedHeaderCallbackElements = Collections.emptyList(); 161 162 public StaxEventItemWriter() { 163 setExecutionContextName(ClassUtils.getShortName(StaxEventItemWriter.class)); 164 } 165 166 /** 167 * Set output file. 168 * 169 * @param resource the output file 170 */ 171 @Override 172 public void setResource(Resource resource) { 173 this.resource = resource; 174 } 175 176 /** 177 * Set Object to XML marshaller. 178 * 179 * @param marshaller the Object to XML marshaller 180 */ 181 public void setMarshaller(Marshaller marshaller) { 182 this.marshaller = marshaller; 183 } 184 185 /** 186 * headerCallback is called before writing any items. 187 * 188 * @param headerCallback the {@link StaxWriterCallback} to be called prior to writing items. 189 */ 190 public void setHeaderCallback(StaxWriterCallback headerCallback) { 191 this.headerCallback = headerCallback; 192 } 193 194 /** 195 * footerCallback is called after writing all items but before closing the 196 * file. 197 * 198 *@param footerCallback the {@link StaxWriterCallback} to be called after writing items. 199 */ 200 public void setFooterCallback(StaxWriterCallback footerCallback) { 201 this.footerCallback = footerCallback; 202 } 203 204 /** 205 * Flag to indicate that writes should be deferred to the end of a 206 * transaction if present. Defaults to true. 207 * 208 * @param transactional the flag to set 209 */ 210 public void setTransactional(boolean transactional) { 211 this.transactional = transactional; 212 } 213 214 /** 215 * Flag to indicate that changes should be force-synced to disk on flush. 216 * Defaults to false, which means that even with a local disk changes could 217 * be lost if the OS crashes in between a write and a cache flush. Setting 218 * to true may result in slower performance for usage patterns involving 219 * many frequent writes. 220 * 221 * @param forceSync the flag value to set 222 */ 223 public void setForceSync(boolean forceSync) { 224 this.forceSync = forceSync; 225 } 226 227 /** 228 * Flag to indicate that the target file should be deleted if no items have 229 * been written (other than header and footer) on close. Defaults to false. 230 * 231 * @param shouldDeleteIfEmpty the flag value to set 232 */ 233 public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) { 234 this.shouldDeleteIfEmpty = shouldDeleteIfEmpty; 235 } 236 237 /** 238 * Get used encoding. 239 * 240 * @return the encoding used 241 */ 242 public String getEncoding() { 243 return encoding; 244 } 245 246 /** 247 * Set encoding to be used for output file. 248 * 249 * @param encoding the encoding to be used 250 */ 251 public void setEncoding(String encoding) { 252 this.encoding = encoding; 253 } 254 255 /** 256 * Get XML version. 257 * 258 * @return the XML version used 259 */ 260 public String getVersion() { 261 return version; 262 } 263 264 /** 265 * Set XML version to be used for output XML. 266 * 267 * @param version the XML version to be used 268 */ 269 public void setVersion(String version) { 270 this.version = version; 271 } 272 273 /** 274 * Get the tag name of the root element. 275 * 276 * @return the root element tag name 277 */ 278 public String getRootTagName() { 279 return rootTagName; 280 } 281 282 /** 283 * Set the tag name of the root element. If not set, default name is used 284 * ("root"). Namespace URI and prefix can also be set optionally using the 285 * notation: 286 * 287 * <pre> 288 * {uri}prefix:root 289 * </pre> 290 * 291 * The prefix is optional (defaults to empty), but if it is specified then 292 * the uri must be provided. In addition you might want to declare other 293 * namespaces using the {@link #setRootElementAttributes(Map) root 294 * attributes}. 295 * 296 * @param rootTagName the tag name to be used for the root element 297 */ 298 public void setRootTagName(String rootTagName) { 299 this.rootTagName = rootTagName; 300 } 301 302 /** 303 * Get the namespace prefix of the root element. Empty by default. 304 * 305 * @return the rootTagNamespacePrefix 306 */ 307 public String getRootTagNamespacePrefix() { 308 return rootTagNamespacePrefix; 309 } 310 311 /** 312 * Get the namespace of the root element. 313 * 314 * @return the rootTagNamespace 315 */ 316 public String getRootTagNamespace() { 317 return rootTagNamespace; 318 } 319 320 /** 321 * Get attributes of the root element. 322 * 323 * @return attributes of the root element 324 */ 325 public Map<String, String> getRootElementAttributes() { 326 return rootElementAttributes; 327 } 328 329 /** 330 * Set the root element attributes to be written. If any of the key names 331 * begin with "xmlns:" then they are treated as namespace declarations. 332 * 333 * @param rootElementAttributes attributes of the root element 334 */ 335 public void setRootElementAttributes(Map<String, String> rootElementAttributes) { 336 this.rootElementAttributes = rootElementAttributes; 337 } 338 339 /** 340 * Set "overwrite" flag for the output file. Flag is ignored when output 341 * file processing is restarted. 342 * 343 * @param overwriteOutput If set to true, output file will be overwritten 344 * (this flag is ignored when processing is restart). 345 */ 346 public void setOverwriteOutput(boolean overwriteOutput) { 347 this.overwriteOutput = overwriteOutput; 348 } 349 350 public void setSaveState(boolean saveState) { 351 this.saveState = saveState; 352 } 353 354 /** 355 * @throws Exception thrown if error occurs 356 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() 357 */ 358 @Override 359 public void afterPropertiesSet() throws Exception { 360 Assert.notNull(marshaller, "A Marshaller is required"); 361 if (rootTagName.contains("{")) { 362 rootTagNamespace = rootTagName.replaceAll("\\{(.*)\\}.*", "$1"); 363 rootTagName = rootTagName.replaceAll("\\{.*\\}(.*)", "$1"); 364 if (rootTagName.contains(":")) { 365 rootTagNamespacePrefix = rootTagName.replaceAll("(.*):.*", "$1"); 366 rootTagName = rootTagName.replaceAll(".*:(.*)", "$1"); 367 } 368 } 369 } 370 371 /** 372 * Open the output source 373 * 374 * @param executionContext the batch context. 375 * 376 * @see org.springframework.batch.item.ItemStream#open(ExecutionContext) 377 */ 378 @SuppressWarnings("unchecked") 379 @Override 380 public void open(ExecutionContext executionContext) { 381 super.open(executionContext); 382 383 Assert.notNull(resource, "The resource must be set"); 384 385 long startAtPosition = 0; 386 387 // if restart data is provided, restart from provided offset 388 // otherwise start from beginning 389 if (executionContext.containsKey(getExecutionContextKey(RESTART_DATA_NAME))) { 390 startAtPosition = executionContext.getLong(getExecutionContextKey(RESTART_DATA_NAME)); 391 currentRecordCount = executionContext.getLong(getExecutionContextKey(WRITE_STATISTICS_NAME)); 392 if (executionContext.containsKey(getExecutionContextKey(UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME))) { 393 unclosedHeaderCallbackElements = (List<QName>) executionContext 394 .get(getExecutionContextKey(UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME)); 395 } 396 397 restarted = true; 398 if (shouldDeleteIfEmpty && currentRecordCount == 0) { 399 // previous execution deleted the output file because no items were written 400 restarted = false; 401 startAtPosition = 0; 402 } else { 403 restarted = true; 404 } 405 } else { 406 currentRecordCount = 0; 407 restarted = false; 408 } 409 410 open(startAtPosition); 411 412 if (startAtPosition == 0) { 413 try { 414 if (headerCallback != null) { 415 UnclosedElementCollectingEventWriter headerCallbackWriter = new UnclosedElementCollectingEventWriter(delegateEventWriter); 416 headerCallback.write(headerCallbackWriter); 417 unclosedHeaderCallbackElements = headerCallbackWriter.getUnclosedElements(); 418 } 419 } 420 catch (IOException e) { 421 throw new ItemStreamException("Failed to write headerItems", e); 422 } 423 } 424 425 this.initialized = true; 426 427 } 428 429 /** 430 * Helper method for opening output source at given file position 431 */ 432 private void open(long position) { 433 434 File file; 435 FileOutputStream os = null; 436 FileChannel fileChannel = null; 437 438 try { 439 file = resource.getFile(); 440 FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput); 441 Assert.state(resource.exists(), "Output resource must exist"); 442 os = new FileOutputStream(file, true); 443 fileChannel = os.getChannel(); 444 channel = os.getChannel(); 445 setPosition(position); 446 } 447 catch (IOException ioe) { 448 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe); 449 } 450 451 XMLOutputFactory outputFactory = createXmlOutputFactory(); 452 453 if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) { 454 // If the current XMLOutputFactory implementation is supplied by 455 // Woodstox >= 3.2.9 we want to disable its 456 // automatic end element feature (see: 457 // https://jira.codehaus.org/browse/WSTX-165) per 458 // https://jira.spring.io/browse/BATCH-761). 459 outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE); 460 } 461 if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) { 462 // On restart we don't write the root element so we have to disable 463 // structural validation (see: 464 // https://jira.spring.io/browse/BATCH-1681). 465 outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE); 466 } 467 468 try { 469 final FileChannel channel = fileChannel; 470 if (transactional) { 471 TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() { 472 @Override 473 public void run() { 474 closeStream(); 475 } 476 }); 477 478 writer.setEncoding(encoding); 479 writer.setForceSync(forceSync); 480 bufferedWriter = writer; 481 } 482 else { 483 bufferedWriter = new BufferedWriter(new OutputStreamWriter(os, encoding)); 484 } 485 delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter); 486 eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter); 487 initNamespaceContext(delegateEventWriter); 488 if (!restarted) { 489 startDocument(delegateEventWriter); 490 if (forceSync) { 491 channel.force(false); 492 } 493 } 494 } 495 catch (XMLStreamException xse) { 496 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse); 497 } 498 catch (UnsupportedEncodingException e) { 499 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource 500 + "] with encoding=[" + encoding + "]", e); 501 } 502 catch (IOException e) { 503 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); 504 } 505 } 506 507 /** 508 * Subclasses can override to customize the writer. 509 * 510 * @param outputFactory the factory to be used to create an {@link XMLEventWriter}. 511 * @param writer the {@link Writer} to be used by the {@link XMLEventWriter} for 512 * writing to character streams. 513 * @return an xml writer 514 * 515 * @throws XMLStreamException thrown if error occured creating {@link XMLEventWriter}. 516 */ 517 protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer) 518 throws XMLStreamException { 519 return outputFactory.createXMLEventWriter(writer); 520 } 521 522 /** 523 * Subclasses can override to customize the factory. 524 * 525 * @return a factory for the xml output 526 * 527 * @throws FactoryConfigurationError throw if an instance of this factory cannot be loaded. 528 */ 529 protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError { 530 return XMLOutputFactory.newInstance(); 531 } 532 533 /** 534 * Subclasses can override to customize the event factory. 535 * 536 * @return a factory for the xml events 537 * 538 * @throws FactoryConfigurationError thrown if an instance of this factory cannot be loaded. 539 */ 540 protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError { 541 XMLEventFactory factory = XMLEventFactory.newInstance(); 542 return factory; 543 } 544 545 /** 546 * Subclasses can override to customize the STAX result. 547 * 548 * @return a result for writing to 549 */ 550 protected Result createStaxResult() { 551 return StaxUtils.getResult(eventWriter); 552 } 553 554 /** 555 * Inits the namespace context of the XMLEventWriter: 556 * <ul> 557 * <li>rootTagNamespacePrefix for rootTagName</li> 558 * <li>any other xmlns namespace prefix declarations in the root element attributes</li> 559 * </ul> 560 * 561 * @param writer XML event writer 562 * 563 * @throws XMLStreamException thrown if error occurs while setting the 564 * prefix or default name space. 565 */ 566 protected void initNamespaceContext(XMLEventWriter writer) throws XMLStreamException { 567 if (StringUtils.hasText(getRootTagNamespace())) { 568 if(StringUtils.hasText(getRootTagNamespacePrefix())) { 569 writer.setPrefix(getRootTagNamespacePrefix(), getRootTagNamespace()); 570 } else { 571 writer.setDefaultNamespace(getRootTagNamespace()); 572 } 573 } 574 if (!CollectionUtils.isEmpty(getRootElementAttributes())) { 575 for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) { 576 String key = entry.getKey(); 577 if (key.startsWith("xmlns")) { 578 String prefix = ""; 579 if (key.contains(":")) { 580 prefix = key.substring(key.indexOf(":") + 1); 581 } 582 if (log.isDebugEnabled()) { 583 log.debug("registering prefix: " +prefix + "=" + entry.getValue()); 584 } 585 writer.setPrefix(prefix, entry.getValue()); 586 } 587 } 588 } 589 } 590 591 /** 592 * Writes simple XML header containing: 593 * <ul> 594 * <li>xml declaration - defines encoding and XML version</li> 595 * <li>opening tag of the root element and its attributes</li> 596 * </ul> 597 * If this is not sufficient for you, simply override this method. Encoding, 598 * version and root tag name can be retrieved with corresponding getters. 599 * 600 * @param writer XML event writer 601 * 602 * @throws XMLStreamException thrown if error occurs. 603 */ 604 protected void startDocument(XMLEventWriter writer) throws XMLStreamException { 605 606 XMLEventFactory factory = createXmlEventFactory(); 607 608 // write start document 609 writer.add(factory.createStartDocument(getEncoding(), getVersion())); 610 611 // write root tag 612 writer.add(factory.createStartElement(getRootTagNamespacePrefix(), getRootTagNamespace(), getRootTagName())); 613 if (StringUtils.hasText(getRootTagNamespace())) { 614 if (StringUtils.hasText(getRootTagNamespacePrefix())) { 615 writer.add(factory.createNamespace(getRootTagNamespacePrefix(), getRootTagNamespace())); 616 } 617 else { 618 writer.add(factory.createNamespace(getRootTagNamespace())); 619 } 620 } 621 622 // write root tag attributes 623 if (!CollectionUtils.isEmpty(getRootElementAttributes())) { 624 625 for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) { 626 String key = entry.getKey(); 627 if (key.startsWith("xmlns")) { 628 String prefix = ""; 629 if (key.contains(":")) { 630 prefix = key.substring(key.indexOf(":") + 1); 631 } 632 writer.add(factory.createNamespace(prefix, entry.getValue())); 633 } 634 else { 635 writer.add(factory.createAttribute(key, entry.getValue())); 636 } 637 } 638 639 } 640 641 /* 642 * This forces the flush to write the end of the root element and avoids 643 * an off-by-one error on restart. 644 */ 645 writer.add(factory.createIgnorableSpace("")); 646 writer.flush(); 647 648 } 649 650 /** 651 * Writes the EndDocument tag manually. 652 * 653 * @param writer XML event writer 654 * 655 * @throws XMLStreamException thrown if error occurs. 656 */ 657 protected void endDocument(XMLEventWriter writer) throws XMLStreamException { 658 659 // writer.writeEndDocument(); <- this doesn't work after restart 660 // we need to write end tag of the root element manually 661 662 String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":"; 663 try { 664 bufferedWriter.write("</" + nsPrefix + getRootTagName() + ">"); 665 } 666 catch (IOException ioe) { 667 throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe); 668 } 669 } 670 671 /** 672 * Flush and close the output source. 673 * 674 * @see org.springframework.batch.item.ItemStream#close() 675 */ 676 @Override 677 public void close() { 678 super.close(); 679 680 XMLEventFactory factory = createXmlEventFactory(); 681 try { 682 delegateEventWriter.add(factory.createCharacters("")); 683 } 684 catch (XMLStreamException e) { 685 log.error(e); 686 } 687 688 try { 689 if (footerCallback != null) { 690 XMLEventWriter footerCallbackWriter = delegateEventWriter; 691 if (restarted && !unclosedHeaderCallbackElements.isEmpty()) { 692 footerCallbackWriter = new UnopenedElementClosingEventWriter( 693 delegateEventWriter, bufferedWriter, unclosedHeaderCallbackElements); 694 } 695 footerCallback.write(footerCallbackWriter); 696 } 697 delegateEventWriter.flush(); 698 endDocument(delegateEventWriter); 699 } 700 catch (IOException e) { 701 throw new ItemStreamException("Failed to write footer items", e); 702 } 703 catch (XMLStreamException e) { 704 throw new ItemStreamException("Failed to write end document tag", e); 705 } 706 finally { 707 708 try { 709 delegateEventWriter.close(); 710 } 711 catch (XMLStreamException e) { 712 log.error("Unable to close file resource: [" + resource + "] " + e); 713 } 714 finally { 715 try { 716 bufferedWriter.close(); 717 } 718 catch (IOException e) { 719 log.error("Unable to close file resource: [" + resource + "] " + e); 720 } 721 finally { 722 if (!transactional) { 723 closeStream(); 724 } 725 } 726 } 727 if (currentRecordCount == 0 && shouldDeleteIfEmpty) { 728 try { 729 resource.getFile().delete(); 730 } 731 catch (IOException e) { 732 throw new ItemStreamException("Failed to delete empty file on close", e); 733 } 734 } 735 } 736 737 this.initialized = false; 738 } 739 740 private void closeStream() { 741 try { 742 channel.close(); 743 } 744 catch (IOException ioe) { 745 log.error("Unable to close file resource: [" + resource + "] " + ioe); 746 } 747 } 748 749 /** 750 * Write the value objects and flush them to the file. 751 * 752 * @param items the value object 753 * 754 * @throws IOException thrown if general error occurs. 755 * @throws XmlMappingException thrown if error occurs during XML Mapping. 756 */ 757 @Override 758 public void write(List<? extends T> items) throws XmlMappingException, IOException { 759 760 if(!this.initialized) { 761 throw new WriterNotOpenException("Writer must be open before it can be written to"); 762 } 763 764 currentRecordCount += items.size(); 765 766 for (Object object : items) { 767 Assert.state(marshaller.supports(object.getClass()), 768 "Marshaller must support the class of the marshalled object"); 769 Result result = createStaxResult(); 770 marshaller.marshal(object, result); 771 } 772 try { 773 eventWriter.flush(); 774 if (forceSync) { 775 channel.force(false); 776 } 777 } 778 catch (XMLStreamException | IOException e) { 779 throw new WriteFailedException("Failed to flush the events", e); 780 } 781 } 782 783 /** 784 * Get the restart data. 785 * 786 * @param executionContext the batch context. 787 * 788 * @see org.springframework.batch.item.ItemStream#update(ExecutionContext) 789 */ 790 @Override 791 public void update(ExecutionContext executionContext) { 792 super.update(executionContext); 793 if (saveState) { 794 Assert.notNull(executionContext, "ExecutionContext must not be null"); 795 executionContext.putLong(getExecutionContextKey(RESTART_DATA_NAME), getPosition()); 796 executionContext.putLong(getExecutionContextKey(WRITE_STATISTICS_NAME), currentRecordCount); 797 if (!unclosedHeaderCallbackElements.isEmpty()) { 798 executionContext.put(getExecutionContextKey(UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME), 799 unclosedHeaderCallbackElements); 800 } 801 } 802 } 803 804 /* 805 * Get the actual position in file channel. This method flushes any buffered 806 * data before position is read. 807 * 808 * @return byte offset in file channel 809 */ 810 private long getPosition() { 811 812 long position; 813 814 try { 815 eventWriter.flush(); 816 position = channel.position(); 817 if (bufferedWriter instanceof TransactionAwareBufferedWriter) { 818 position += ((TransactionAwareBufferedWriter) bufferedWriter).getBufferSize(); 819 } 820 } 821 catch (Exception e) { 822 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); 823 } 824 825 return position; 826 } 827 828 /** 829 * Set the file channel position. 830 * 831 * @param newPosition new file channel position 832 */ 833 private void setPosition(long newPosition) { 834 835 try { 836 channel.truncate(newPosition); 837 channel.position(newPosition); 838 } 839 catch (IOException e) { 840 throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e); 841 } 842 843 } 844 845}