001/*
002 * Copyright 2006-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.xml;
018
019import java.io.BufferedWriter;
020import java.io.File;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.OutputStreamWriter;
024import java.io.UnsupportedEncodingException;
025import java.io.Writer;
026import java.nio.channels.FileChannel;
027import java.util.Collections;
028import java.util.List;
029import java.util.Map;
030import javax.xml.namespace.QName;
031import javax.xml.stream.FactoryConfigurationError;
032import javax.xml.stream.XMLEventFactory;
033import javax.xml.stream.XMLEventWriter;
034import javax.xml.stream.XMLOutputFactory;
035import javax.xml.stream.XMLStreamException;
036import javax.xml.transform.Result;
037
038import org.apache.commons.logging.Log;
039import org.apache.commons.logging.LogFactory;
040
041import org.springframework.batch.item.ExecutionContext;
042import org.springframework.batch.item.ItemStreamException;
043import org.springframework.batch.item.ItemWriter;
044import org.springframework.batch.item.WriteFailedException;
045import org.springframework.batch.item.WriterNotOpenException;
046import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream;
047import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
048import org.springframework.batch.item.util.FileUtils;
049import org.springframework.batch.item.xml.stax.NoStartEndDocumentStreamWriter;
050import org.springframework.batch.item.xml.stax.UnclosedElementCollectingEventWriter;
051import org.springframework.batch.item.xml.stax.UnopenedElementClosingEventWriter;
052import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
053import org.springframework.beans.factory.InitializingBean;
054import org.springframework.core.io.Resource;
055import org.springframework.dao.DataAccessResourceFailureException;
056import org.springframework.oxm.Marshaller;
057import org.springframework.oxm.XmlMappingException;
058import org.springframework.util.Assert;
059import org.springframework.util.ClassUtils;
060import org.springframework.util.CollectionUtils;
061import org.springframework.util.StringUtils;
062
063/**
064 * An implementation of {@link ItemWriter} which uses StAX and
065 * {@link Marshaller} for serializing object to XML.
066 * 
067 * This item writer also provides restart, statistics and transaction features
068 * by implementing corresponding interfaces.
069 * 
070 * The implementation is <b>not</b> thread-safe.
071 * 
072 * @author Peter Zozom
073 * @author Robert Kasanicky
074 * @author Michael Minella
075 * 
076 */
077public class StaxEventItemWriter<T> extends AbstractItemStreamItemWriter<T> implements
078ResourceAwareItemWriterItemStream<T>, InitializingBean {
079
080        private static final Log log = LogFactory.getLog(StaxEventItemWriter.class);
081
082        // default encoding
083        public static final String DEFAULT_ENCODING = "UTF-8";
084
085        // default encoding
086        public static final String DEFAULT_XML_VERSION = "1.0";
087
088        // default root tag name
089        public static final String DEFAULT_ROOT_TAG_NAME = "root";
090
091        // restart data property name
092        private static final String RESTART_DATA_NAME = "position";
093
094        // unclosed header callback elements property name
095        private static final String UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME = "unclosedHeaderCallbackElements";
096        
097        // restart data property name
098        private static final String WRITE_STATISTICS_NAME = "record.count";
099
100        // file system resource
101        private Resource resource;
102
103        // xml marshaller
104        private Marshaller marshaller;
105
106        // encoding to be used while reading from the resource
107        private String encoding = DEFAULT_ENCODING;
108
109        // XML version
110        private String version = DEFAULT_XML_VERSION;
111
112        // name of the root tag
113        private String rootTagName = DEFAULT_ROOT_TAG_NAME;
114
115        // namespace prefix of the root tag
116        private String rootTagNamespacePrefix = "";
117
118        // namespace of the root tag
119        private String rootTagNamespace = "";
120
121        // root element attributes
122        private Map<String, String> rootElementAttributes = null;
123
124        // TRUE means, that output file will be overwritten if exists - default is
125        // TRUE
126        private boolean overwriteOutput = true;
127
128        // file channel
129        private FileChannel channel;
130
131        // wrapper for XML event writer that swallows StartDocument and EndDocument
132        // events
133        private XMLEventWriter eventWriter;
134
135        // XML event writer
136        private XMLEventWriter delegateEventWriter;
137
138        // current count of processed records
139        private long currentRecordCount = 0;
140
141        private boolean saveState = true;
142
143        private StaxWriterCallback headerCallback;
144
145        private StaxWriterCallback footerCallback;
146
147        private Writer bufferedWriter;
148
149        private boolean transactional = true;
150
151        private boolean forceSync;
152
153        private boolean shouldDeleteIfEmpty = false;
154        
155        private boolean restarted = false;
156
157        private boolean initialized = false;
158        
159        // List holding the QName of elements that were opened in the header callback, but not closed
160        private List<QName> unclosedHeaderCallbackElements = Collections.emptyList();
161
162        public StaxEventItemWriter() {
163                setExecutionContextName(ClassUtils.getShortName(StaxEventItemWriter.class));
164        }
165
166        /**
167         * Set output file.
168         * 
169         * @param resource the output file
170         */
171        @Override
172        public void setResource(Resource resource) {
173                this.resource = resource;
174        }
175
176        /**
177         * Set Object to XML marshaller.
178         * 
179         * @param marshaller the Object to XML marshaller
180         */
181        public void setMarshaller(Marshaller marshaller) {
182                this.marshaller = marshaller;
183        }
184
185        /**
186         * headerCallback is called before writing any items.
187         *
188         * @param headerCallback the {@link StaxWriterCallback} to be called prior to writing items.
189         */
190        public void setHeaderCallback(StaxWriterCallback headerCallback) {
191                this.headerCallback = headerCallback;
192        }
193
194        /**
195         * footerCallback is called after writing all items but before closing the
196         * file.
197         *
198         *@param footerCallback the {@link StaxWriterCallback} to be called after writing items.
199         */
200        public void setFooterCallback(StaxWriterCallback footerCallback) {
201                this.footerCallback = footerCallback;
202        }
203
204        /**
205         * Flag to indicate that writes should be deferred to the end of a
206         * transaction if present. Defaults to true.
207         * 
208         * @param transactional the flag to set
209         */
210        public void setTransactional(boolean transactional) {
211                this.transactional = transactional;
212        }
213
214        /**
215         * Flag to indicate that changes should be force-synced to disk on flush.
216         * Defaults to false, which means that even with a local disk changes could
217         * be lost if the OS crashes in between a write and a cache flush. Setting
218         * to true may result in slower performance for usage patterns involving
219         * many frequent writes.
220         * 
221         * @param forceSync the flag value to set
222         */
223        public void setForceSync(boolean forceSync) {
224                this.forceSync = forceSync;
225        }
226
227        /**
228         * Flag to indicate that the target file should be deleted if no items have
229         * been written (other than header and footer) on close. Defaults to false.
230         * 
231         * @param shouldDeleteIfEmpty the flag value to set
232         */
233        public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) {
234                this.shouldDeleteIfEmpty = shouldDeleteIfEmpty;
235        }
236
237        /**
238         * Get used encoding.
239         * 
240         * @return the encoding used
241         */
242        public String getEncoding() {
243                return encoding;
244        }
245
246        /**
247         * Set encoding to be used for output file.
248         * 
249         * @param encoding the encoding to be used
250         */
251        public void setEncoding(String encoding) {
252                this.encoding = encoding;
253        }
254
255        /**
256         * Get XML version.
257         *
258         * @return the XML version used
259         */
260        public String getVersion() {
261                return version;
262        }
263
264        /**
265         * Set XML version to be used for output XML.
266         * 
267         * @param version the XML version to be used
268         */
269        public void setVersion(String version) {
270                this.version = version;
271        }
272
273        /**
274         * Get the tag name of the root element.
275         * 
276         * @return the root element tag name
277         */
278        public String getRootTagName() {
279                return rootTagName;
280        }
281
282        /**
283         * Set the tag name of the root element. If not set, default name is used
284         * ("root"). Namespace URI and prefix can also be set optionally using the
285         * notation:
286         * 
287         * <pre>
288         * {uri}prefix:root
289         * </pre>
290         * 
291         * The prefix is optional (defaults to empty), but if it is specified then
292         * the uri must be provided. In addition you might want to declare other
293         * namespaces using the {@link #setRootElementAttributes(Map) root
294         * attributes}.
295         * 
296         * @param rootTagName the tag name to be used for the root element
297         */
298        public void setRootTagName(String rootTagName) {
299                this.rootTagName = rootTagName;
300        }
301
302        /**
303         * Get the namespace prefix of the root element. Empty by default.
304         * 
305         * @return the rootTagNamespacePrefix
306         */
307        public String getRootTagNamespacePrefix() {
308                return rootTagNamespacePrefix;
309        }
310
311        /**
312         * Get the namespace of the root element.
313         * 
314         * @return the rootTagNamespace
315         */
316        public String getRootTagNamespace() {
317                return rootTagNamespace;
318        }
319
320        /**
321         * Get attributes of the root element.
322         * 
323         * @return attributes of the root element
324         */
325        public Map<String, String> getRootElementAttributes() {
326                return rootElementAttributes;
327        }
328
329        /**
330         * Set the root element attributes to be written. If any of the key names
331         * begin with "xmlns:" then they are treated as namespace declarations.
332         * 
333         * @param rootElementAttributes attributes of the root element
334         */
335        public void setRootElementAttributes(Map<String, String> rootElementAttributes) {
336                this.rootElementAttributes = rootElementAttributes;
337        }
338
339        /**
340         * Set "overwrite" flag for the output file. Flag is ignored when output
341         * file processing is restarted.
342         * 
343         * @param overwriteOutput If set to true, output file will be overwritten
344         * (this flag is ignored when processing is restart).
345         */
346        public void setOverwriteOutput(boolean overwriteOutput) {
347                this.overwriteOutput = overwriteOutput;
348        }
349
350        public void setSaveState(boolean saveState) {
351                this.saveState = saveState;
352        }
353
354        /**
355         * @throws Exception thrown if error occurs
356         * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
357         */
358        @Override
359        public void afterPropertiesSet() throws Exception {
360                Assert.notNull(marshaller, "A Marshaller is required");
361                if (rootTagName.contains("{")) {
362                        rootTagNamespace = rootTagName.replaceAll("\\{(.*)\\}.*", "$1");
363                        rootTagName = rootTagName.replaceAll("\\{.*\\}(.*)", "$1");
364                        if (rootTagName.contains(":")) {
365                                rootTagNamespacePrefix = rootTagName.replaceAll("(.*):.*", "$1");
366                                rootTagName = rootTagName.replaceAll(".*:(.*)", "$1");
367                        }
368                }
369        }
370
371        /**
372         * Open the output source
373         *
374         * @param executionContext the batch context.
375         * 
376         * @see org.springframework.batch.item.ItemStream#open(ExecutionContext)
377         */
378        @SuppressWarnings("unchecked")
379        @Override
380        public void open(ExecutionContext executionContext) {
381                super.open(executionContext);
382
383                Assert.notNull(resource, "The resource must be set");
384
385                long startAtPosition = 0;
386                
387                // if restart data is provided, restart from provided offset
388                // otherwise start from beginning
389                if (executionContext.containsKey(getExecutionContextKey(RESTART_DATA_NAME))) {
390                        startAtPosition = executionContext.getLong(getExecutionContextKey(RESTART_DATA_NAME));
391                        currentRecordCount = executionContext.getLong(getExecutionContextKey(WRITE_STATISTICS_NAME));
392                        if (executionContext.containsKey(getExecutionContextKey(UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME))) {
393                                unclosedHeaderCallbackElements = (List<QName>) executionContext
394                                                .get(getExecutionContextKey(UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME));
395                        }
396                        
397                        restarted = true;
398                        if (shouldDeleteIfEmpty && currentRecordCount == 0) {
399                                // previous execution deleted the output file because no items were written
400                                restarted = false;
401                                startAtPosition = 0;
402                        } else {
403                                restarted = true;
404                        }
405                } else {
406                        currentRecordCount = 0;
407                        restarted = false;
408                }
409
410                open(startAtPosition);
411
412                if (startAtPosition == 0) {
413                        try {
414                                if (headerCallback != null) {
415                                        UnclosedElementCollectingEventWriter headerCallbackWriter = new UnclosedElementCollectingEventWriter(delegateEventWriter);
416                                        headerCallback.write(headerCallbackWriter);
417                                        unclosedHeaderCallbackElements = headerCallbackWriter.getUnclosedElements();
418                                }
419                        }
420                        catch (IOException e) {
421                                throw new ItemStreamException("Failed to write headerItems", e);
422                        }
423                }
424
425                this.initialized = true;
426
427        }
428
429        /**
430         * Helper method for opening output source at given file position
431         */
432        private void open(long position) {
433
434                File file;
435                FileOutputStream os = null;
436                FileChannel fileChannel = null;
437
438                try {
439                        file = resource.getFile();
440                        FileUtils.setUpOutputFile(file, restarted, false, overwriteOutput);
441                        Assert.state(resource.exists(), "Output resource must exist");
442                        os = new FileOutputStream(file, true);
443                        fileChannel = os.getChannel();
444                        channel = os.getChannel();
445                        setPosition(position);
446                }
447                catch (IOException ioe) {
448                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", ioe);
449                }
450
451                XMLOutputFactory outputFactory = createXmlOutputFactory();
452
453                if (outputFactory.isPropertySupported("com.ctc.wstx.automaticEndElements")) {
454                        // If the current XMLOutputFactory implementation is supplied by
455                        // Woodstox >= 3.2.9 we want to disable its
456                        // automatic end element feature (see:
457                        // https://jira.codehaus.org/browse/WSTX-165) per
458                        // https://jira.spring.io/browse/BATCH-761).
459                        outputFactory.setProperty("com.ctc.wstx.automaticEndElements", Boolean.FALSE);
460                }
461                if (outputFactory.isPropertySupported("com.ctc.wstx.outputValidateStructure")) {
462                        // On restart we don't write the root element so we have to disable
463                        // structural validation (see:
464                        // https://jira.spring.io/browse/BATCH-1681).
465                        outputFactory.setProperty("com.ctc.wstx.outputValidateStructure", Boolean.FALSE);
466                }
467
468                try {
469                        final FileChannel channel = fileChannel;
470                        if (transactional) {
471                                TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, new Runnable() {
472                                        @Override
473                                        public void run() {
474                                                closeStream();
475                                        }
476                                });
477
478                                writer.setEncoding(encoding);
479                                writer.setForceSync(forceSync);
480                                bufferedWriter = writer;
481                        }
482                        else {
483                                bufferedWriter =  new BufferedWriter(new OutputStreamWriter(os, encoding));
484                        }
485                        delegateEventWriter = createXmlEventWriter(outputFactory, bufferedWriter);
486                        eventWriter = new NoStartEndDocumentStreamWriter(delegateEventWriter);
487                        initNamespaceContext(delegateEventWriter);
488                        if (!restarted) {
489                                startDocument(delegateEventWriter);
490                                if (forceSync) {
491                                        channel.force(false);
492                                }
493                        }
494                }
495                catch (XMLStreamException xse) {
496                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", xse);
497                }
498                catch (UnsupportedEncodingException e) {
499                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource
500                                        + "] with encoding=[" + encoding + "]", e);
501                } 
502                catch (IOException e) {
503                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
504                }
505        }
506
507        /**
508         * Subclasses can override to customize the writer.
509         *
510         * @param outputFactory the factory to be used to create an {@link XMLEventWriter}.
511         * @param writer the {@link Writer} to be used by the {@link XMLEventWriter} for
512         * writing to character streams.
513         * @return an xml writer
514         *
515         * @throws XMLStreamException thrown if error occured creating {@link XMLEventWriter}.
516         */
517        protected XMLEventWriter createXmlEventWriter(XMLOutputFactory outputFactory, Writer writer)
518                        throws XMLStreamException {
519                return outputFactory.createXMLEventWriter(writer);
520        }
521
522        /**
523         * Subclasses can override to customize the factory.
524         *
525         * @return a factory for the xml output
526         *
527         * @throws FactoryConfigurationError throw if an instance of this factory cannot be loaded.
528         */
529        protected XMLOutputFactory createXmlOutputFactory() throws FactoryConfigurationError {
530                return XMLOutputFactory.newInstance();
531        }
532
533        /**
534         * Subclasses can override to customize the event factory.
535         *
536         * @return a factory for the xml events
537         *
538         * @throws FactoryConfigurationError thrown if an instance of this factory cannot be loaded.
539         */
540        protected XMLEventFactory createXmlEventFactory() throws FactoryConfigurationError {
541                XMLEventFactory factory = XMLEventFactory.newInstance();
542                return factory;
543        }
544
545        /**
546         * Subclasses can override to customize the STAX result.
547         *
548         * @return a result for writing to
549         */
550        protected Result createStaxResult() {
551                return StaxUtils.getResult(eventWriter);
552        }
553
554        /**
555         * Inits the namespace context of the XMLEventWriter:
556         * <ul>
557         * <li>rootTagNamespacePrefix for rootTagName</li>
558         * <li>any other xmlns namespace prefix declarations in the root element attributes</li>
559         * </ul>
560         * 
561         * @param writer XML event writer
562         *
563         * @throws XMLStreamException thrown if error occurs while setting the
564         * prefix or default name space.
565         */
566        protected void initNamespaceContext(XMLEventWriter writer) throws XMLStreamException {
567                if (StringUtils.hasText(getRootTagNamespace())) {
568                        if(StringUtils.hasText(getRootTagNamespacePrefix())) {
569                                writer.setPrefix(getRootTagNamespacePrefix(), getRootTagNamespace());
570                        } else {
571                                writer.setDefaultNamespace(getRootTagNamespace());
572                        }
573                }
574                if (!CollectionUtils.isEmpty(getRootElementAttributes())) {
575                        for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) {
576                                String key = entry.getKey();
577                                if (key.startsWith("xmlns")) {
578                                        String prefix = "";
579                                        if (key.contains(":")) {
580                                                prefix = key.substring(key.indexOf(":") + 1);
581                                        }
582                                        if (log.isDebugEnabled()) {
583                                                log.debug("registering prefix: " +prefix + "=" + entry.getValue());
584                                        }
585                                        writer.setPrefix(prefix, entry.getValue());
586                                }
587                        }
588                }
589        }
590
591        /**
592         * Writes simple XML header containing:
593         * <ul>
594         * <li>xml declaration - defines encoding and XML version</li>
595         * <li>opening tag of the root element and its attributes</li>
596         * </ul>
597         * If this is not sufficient for you, simply override this method. Encoding,
598         * version and root tag name can be retrieved with corresponding getters.
599         * 
600         * @param writer XML event writer
601         *
602         * @throws XMLStreamException thrown if error occurs.
603         */
604        protected void startDocument(XMLEventWriter writer) throws XMLStreamException {
605
606                XMLEventFactory factory = createXmlEventFactory();
607
608                // write start document
609                writer.add(factory.createStartDocument(getEncoding(), getVersion()));
610
611                // write root tag
612                writer.add(factory.createStartElement(getRootTagNamespacePrefix(), getRootTagNamespace(), getRootTagName()));
613                if (StringUtils.hasText(getRootTagNamespace())) {
614                        if (StringUtils.hasText(getRootTagNamespacePrefix())) {
615                                writer.add(factory.createNamespace(getRootTagNamespacePrefix(), getRootTagNamespace()));
616                        }
617                        else {
618                                writer.add(factory.createNamespace(getRootTagNamespace()));
619                        }
620                }
621
622                // write root tag attributes
623                if (!CollectionUtils.isEmpty(getRootElementAttributes())) {
624
625                        for (Map.Entry<String, String> entry : getRootElementAttributes().entrySet()) {
626                                String key = entry.getKey();
627                                if (key.startsWith("xmlns")) {
628                                        String prefix = "";
629                                        if (key.contains(":")) {
630                                                prefix = key.substring(key.indexOf(":") + 1);
631                                        }
632                                        writer.add(factory.createNamespace(prefix, entry.getValue()));
633                                }
634                                else {
635                                        writer.add(factory.createAttribute(key, entry.getValue()));
636                                }
637                        }
638
639                }
640
641                /*
642                 * This forces the flush to write the end of the root element and avoids
643                 * an off-by-one error on restart.
644                 */
645                writer.add(factory.createIgnorableSpace(""));
646                writer.flush();
647
648        }
649
650        /**
651         * Writes the EndDocument tag manually.
652         * 
653         * @param writer XML event writer
654         *
655         * @throws XMLStreamException thrown if error occurs.
656         */
657        protected void endDocument(XMLEventWriter writer) throws XMLStreamException {
658
659                // writer.writeEndDocument(); <- this doesn't work after restart
660                // we need to write end tag of the root element manually
661
662                String nsPrefix = !StringUtils.hasText(getRootTagNamespacePrefix()) ? "" : getRootTagNamespacePrefix() + ":";
663                try {
664                        bufferedWriter.write("</" + nsPrefix + getRootTagName() + ">");
665                }
666                catch (IOException ioe) {
667                        throw new DataAccessResourceFailureException("Unable to close file resource: [" + resource + "]", ioe);
668                }
669        }
670
671        /**
672         * Flush and close the output source.
673         * 
674         * @see org.springframework.batch.item.ItemStream#close()
675         */
676        @Override
677        public void close() {
678                super.close();
679
680                XMLEventFactory factory = createXmlEventFactory();
681                try {
682                        delegateEventWriter.add(factory.createCharacters(""));
683                }
684                catch (XMLStreamException e) {
685                        log.error(e);
686                }
687
688                try {
689                        if (footerCallback != null) {
690                                XMLEventWriter footerCallbackWriter = delegateEventWriter;
691                                if (restarted && !unclosedHeaderCallbackElements.isEmpty()) {
692                                        footerCallbackWriter = new UnopenedElementClosingEventWriter(
693                                                        delegateEventWriter, bufferedWriter, unclosedHeaderCallbackElements);
694                                } 
695                                footerCallback.write(footerCallbackWriter);
696                        }
697                        delegateEventWriter.flush();
698                        endDocument(delegateEventWriter);
699                }
700                catch (IOException e) {
701                        throw new ItemStreamException("Failed to write footer items", e);
702                }
703                catch (XMLStreamException e) {
704                        throw new ItemStreamException("Failed to write end document tag", e);
705                }
706                finally {
707
708                        try {
709                                delegateEventWriter.close();
710                        }
711                        catch (XMLStreamException e) {
712                                log.error("Unable to close file resource: [" + resource + "] " + e);
713                        }
714                        finally {
715                                try {
716                                        bufferedWriter.close();
717                                }
718                                catch (IOException e) {
719                                        log.error("Unable to close file resource: [" + resource + "] " + e);
720                                }
721                                finally {
722                                        if (!transactional) {
723                                                closeStream();
724                                        }
725                                }
726                        }
727                        if (currentRecordCount == 0 && shouldDeleteIfEmpty) {
728                                try {
729                                        resource.getFile().delete();
730                                }
731                                catch (IOException e) {
732                                        throw new ItemStreamException("Failed to delete empty file on close", e);
733                                }
734                        }
735                }
736
737                this.initialized = false;
738        }
739
740        private void closeStream() {
741                try {
742                        channel.close();
743                }
744                catch (IOException ioe) {
745                        log.error("Unable to close file resource: [" + resource + "] " + ioe);
746                }
747        }
748
749        /**
750         * Write the value objects and flush them to the file.
751         * 
752         * @param items the value object
753         *
754         * @throws IOException thrown if general error occurs.
755         * @throws XmlMappingException thrown if error occurs during XML Mapping.
756         */
757        @Override
758        public void write(List<? extends T> items) throws XmlMappingException, IOException {
759
760                if(!this.initialized) {
761                        throw new WriterNotOpenException("Writer must be open before it can be written to");
762                }
763
764                currentRecordCount += items.size();
765
766                for (Object object : items) {
767                        Assert.state(marshaller.supports(object.getClass()),
768                                        "Marshaller must support the class of the marshalled object");
769                        Result result = createStaxResult();
770                        marshaller.marshal(object, result);
771                }
772                try {
773                        eventWriter.flush();
774                        if (forceSync) {
775                                channel.force(false);
776                        }                       
777                }
778                catch (XMLStreamException | IOException e) {
779                        throw new WriteFailedException("Failed to flush the events", e);
780                } 
781        }
782
783        /**
784         * Get the restart data.
785         *
786         * @param executionContext the batch context.
787         * 
788         * @see org.springframework.batch.item.ItemStream#update(ExecutionContext)
789         */
790        @Override
791        public void update(ExecutionContext executionContext) {
792                super.update(executionContext);
793                if (saveState) {
794                        Assert.notNull(executionContext, "ExecutionContext must not be null");
795                        executionContext.putLong(getExecutionContextKey(RESTART_DATA_NAME), getPosition());
796                        executionContext.putLong(getExecutionContextKey(WRITE_STATISTICS_NAME), currentRecordCount);
797                        if (!unclosedHeaderCallbackElements.isEmpty()) {
798                                executionContext.put(getExecutionContextKey(UNCLOSED_HEADER_CALLBACK_ELEMENTS_NAME),
799                                                unclosedHeaderCallbackElements);
800                        }                       
801                }
802        }
803
804        /*
805         * Get the actual position in file channel. This method flushes any buffered
806         * data before position is read.
807         * 
808         * @return byte offset in file channel
809         */
810        private long getPosition() {
811
812                long position;
813
814                try {
815                        eventWriter.flush();
816                        position = channel.position();
817                        if (bufferedWriter instanceof TransactionAwareBufferedWriter) {
818                                position += ((TransactionAwareBufferedWriter) bufferedWriter).getBufferSize();
819                        }
820                }
821                catch (Exception e) {
822                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
823                }
824
825                return position;
826        }
827
828        /**
829         * Set the file channel position.
830         * 
831         * @param newPosition new file channel position
832         */
833        private void setPosition(long newPosition) {
834
835                try {
836                        channel.truncate(newPosition);
837                        channel.position(newPosition);
838                }
839                catch (IOException e) {
840                        throw new DataAccessResourceFailureException("Unable to write to file resource: [" + resource + "]", e);
841                }
842
843        }
844
845}