001/*
002 * Copyright 2006-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *       https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.support;
018
019import java.io.BufferedWriter;
020import java.io.File;
021import java.io.FileOutputStream;
022import java.io.IOException;
023import java.io.Writer;
024import java.nio.channels.Channels;
025import java.nio.channels.FileChannel;
026import java.nio.charset.UnsupportedCharsetException;
027import java.util.List;
028
029import org.apache.commons.logging.Log;
030import org.apache.commons.logging.LogFactory;
031
032import org.springframework.batch.item.ExecutionContext;
033import org.springframework.batch.item.ItemStream;
034import org.springframework.batch.item.ItemStreamException;
035import org.springframework.batch.item.WriteFailedException;
036import org.springframework.batch.item.WriterNotOpenException;
037import org.springframework.batch.item.file.FlatFileFooterCallback;
038import org.springframework.batch.item.file.FlatFileHeaderCallback;
039import org.springframework.batch.item.file.ResourceAwareItemWriterItemStream;
040import org.springframework.batch.item.util.FileUtils;
041import org.springframework.batch.support.transaction.TransactionAwareBufferedWriter;
042import org.springframework.beans.factory.InitializingBean;
043import org.springframework.core.io.Resource;
044import org.springframework.util.Assert;
045
046/**
047 * Base class for item writers that write data to a file or stream.
048 * This class provides common features like restart, force sync, append etc.
049 * The location of the output file is defined by a {@link Resource} which must
050 * represent a writable file.<br>
051 * 
052 * Uses buffered writer to improve performance.<br>
053 * 
054 * The implementation is <b>not</b> thread-safe.
055 * 
056 * @author Waseem Malik
057 * @author Tomas Slanina
058 * @author Robert Kasanicky
059 * @author Dave Syer
060 * @author Michael Minella
061 * @author Mahmoud Ben Hassine
062 *
063 * @since 4.1
064 */
065public abstract class AbstractFileItemWriter<T> extends AbstractItemStreamItemWriter<T>
066                implements ResourceAwareItemWriterItemStream<T>, InitializingBean {
067
068        public static final boolean DEFAULT_TRANSACTIONAL = true;
069
070        protected static final Log logger = LogFactory.getLog(AbstractFileItemWriter.class);
071
072        public static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
073
074        // default encoding for writing to output files - set to UTF-8.
075        public static final String DEFAULT_CHARSET = "UTF-8";
076
077        private static final String WRITTEN_STATISTICS_NAME = "written";
078
079        private static final String RESTART_DATA_NAME = "current.count";
080
081        private Resource resource;
082
083        protected OutputState state = null;
084
085        private boolean saveState = true;
086
087        private boolean forceSync = false;
088
089        protected boolean shouldDeleteIfExists = true;
090
091        private boolean shouldDeleteIfEmpty = false;
092
093        private String encoding = DEFAULT_CHARSET;
094
095        private FlatFileHeaderCallback headerCallback;
096
097        private FlatFileFooterCallback footerCallback;
098
099        protected String lineSeparator = DEFAULT_LINE_SEPARATOR;
100
101        private boolean transactional = DEFAULT_TRANSACTIONAL;
102
103        protected boolean append = false;
104
105        /**
106         * Flag to indicate that changes should be force-synced to disk on flush.
107         * Defaults to false, which means that even with a local disk changes could
108         * be lost if the OS crashes in between a write and a cache flush. Setting
109         * to true may result in slower performance for usage patterns involving many
110         * frequent writes.
111         * 
112         * @param forceSync the flag value to set
113         */
114        public void setForceSync(boolean forceSync) {
115                this.forceSync = forceSync;
116        }
117
118        /**
119         * Public setter for the line separator. Defaults to the System property
120         * line.separator.
121         * @param lineSeparator the line separator to set
122         */
123        public void setLineSeparator(String lineSeparator) {
124                this.lineSeparator = lineSeparator;
125        }
126
127        /**
128         * Setter for resource. Represents a file that can be written.
129         * 
130         * @param resource the resource to be written to
131         */
132        @Override
133        public void setResource(Resource resource) {
134                this.resource = resource;
135        }
136
137        /**
138         * Sets encoding for output template.
139         *
140         * @param newEncoding {@link String} containing the encoding to be used for
141         * the writer.
142         */
143        public void setEncoding(String newEncoding) {
144                this.encoding = newEncoding;
145        }
146
147        /**
148         * Flag to indicate that the target file should be deleted if it already
149         * exists, otherwise it will be created. Defaults to true, so no appending
150         * except on restart. If set to false and {@link #setAppendAllowed(boolean)
151         * appendAllowed} is also false then there will be an exception when the
152         * stream is opened to prevent existing data being potentially corrupted.
153         * 
154         * @param shouldDeleteIfExists the flag value to set
155         */
156        public void setShouldDeleteIfExists(boolean shouldDeleteIfExists) {
157                this.shouldDeleteIfExists = shouldDeleteIfExists;
158        }
159
160        /**
161         * Flag to indicate that the target file should be appended if it already
162         * exists. If this flag is set then the flag
163         * {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is
164         * automatically set to false, so that flag should not be set explicitly.
165         * Defaults value is false.
166         * 
167         * @param append the flag value to set
168         */
169        public void setAppendAllowed(boolean append) {
170                this.append = append;
171        }
172
173        /**
174         * Flag to indicate that the target file should be deleted if no lines have
175         * been written (other than header and footer) on close. Defaults to false.
176         * 
177         * @param shouldDeleteIfEmpty the flag value to set
178         */
179        public void setShouldDeleteIfEmpty(boolean shouldDeleteIfEmpty) {
180                this.shouldDeleteIfEmpty = shouldDeleteIfEmpty;
181        }
182
183        /**
184         * Set the flag indicating whether or not state should be saved in the
185         * provided {@link ExecutionContext} during the {@link ItemStream} call to
186         * update. Setting this to false means that it will always start at the
187         * beginning on a restart.
188         * 
189         * @param saveState if true, state will be persisted
190         */
191        public void setSaveState(boolean saveState) {
192                this.saveState = saveState;
193        }
194
195        /**
196         * headerCallback will be called before writing the first item to file.
197         * Newline will be automatically appended after the header is written.
198         *
199         * @param headerCallback {@link FlatFileHeaderCallback} to generate the header
200         *
201         */
202        public void setHeaderCallback(FlatFileHeaderCallback headerCallback) {
203                this.headerCallback = headerCallback;
204        }
205
206        /**
207         * footerCallback will be called after writing the last item to file, but
208         * before the file is closed.
209         *
210         * @param footerCallback {@link FlatFileFooterCallback} to generate the footer
211         *
212         */
213        public void setFooterCallback(FlatFileFooterCallback footerCallback) {
214                this.footerCallback = footerCallback;
215        }
216
217        /**
218         * Flag to indicate that writing to the buffer should be delayed if a
219         * transaction is active. Defaults to true.
220         *
221         * @param transactional true if writing to buffer should be delayed.
222         *
223         */
224        public void setTransactional(boolean transactional) {
225                this.transactional = transactional;
226        }
227
228        /**
229         * Writes out a string followed by a "new line", where the format of the new
230         * line separator is determined by the underlying operating system.
231         * 
232         * @param items list of items to be written to output stream
233         * @throws Exception if an error occurs while writing items to the output stream
234         */
235        @Override
236        public void write(List<? extends T> items) throws Exception {
237                if (!getOutputState().isInitialized()) {
238                        throw new WriterNotOpenException("Writer must be open before it can be written to");
239                }
240
241                if (logger.isDebugEnabled()) {
242                        logger.debug("Writing to file with " + items.size() + " items.");
243                }
244
245                OutputState state = getOutputState();
246
247                String lines = doWrite(items);
248                try {
249                        state.write(lines);
250                }
251                catch (IOException e) {
252                        throw new WriteFailedException("Could not write data. The file may be corrupt.", e);
253                }
254                state.setLinesWritten(state.getLinesWritten() + items.size());
255        }
256
257        /**
258         * Write out a string of items followed by a "new line", where the format of the new
259         * line separator is determined by the underlying operating system.
260         * @param items to be written
261         * @return written lines
262         */
263        protected abstract String doWrite(List<? extends T> items);
264
265        /**
266         * @see ItemStream#close()
267         */
268        @Override
269        public void close() {
270                super.close();
271                if (state != null) {
272                        try {
273                                if (footerCallback != null && state.outputBufferedWriter != null) {
274                                        footerCallback.writeFooter(state.outputBufferedWriter);
275                                        state.outputBufferedWriter.flush();
276                                }
277                        }
278                        catch (IOException e) {
279                                throw new ItemStreamException("Failed to write footer before closing", e);
280                        }
281                        finally {
282                                state.close();
283                                if (state.linesWritten == 0 && shouldDeleteIfEmpty) {
284                                        try {
285                                                resource.getFile().delete();
286                                        }
287                                        catch (IOException e) {
288                                                throw new ItemStreamException("Failed to delete empty file on close", e);
289                                        }
290                                }
291                                state = null;
292                        }
293                }
294        }
295
296        /**
297         * Initialize the reader. This method may be called multiple times before
298         * close is called.
299         * 
300         * @see ItemStream#open(ExecutionContext)
301         */
302        @Override
303        public void open(ExecutionContext executionContext) throws ItemStreamException {
304                super.open(executionContext);
305
306                Assert.notNull(resource, "The resource must be set");
307
308                if (!getOutputState().isInitialized()) {
309                        doOpen(executionContext);
310                }
311        }
312
313        private void doOpen(ExecutionContext executionContext) throws ItemStreamException {
314                OutputState outputState = getOutputState();
315                if (executionContext.containsKey(getExecutionContextKey(RESTART_DATA_NAME))) {
316                        outputState.restoreFrom(executionContext);
317                }
318                try {
319                        outputState.initializeBufferedWriter();
320                }
321                catch (IOException ioe) {
322                        throw new ItemStreamException("Failed to initialize writer", ioe);
323                }
324                if (outputState.lastMarkedByteOffsetPosition == 0 && !outputState.appending) {
325                        if (headerCallback != null) {
326                                try {
327                                        headerCallback.writeHeader(outputState.outputBufferedWriter);
328                                        outputState.write(lineSeparator);
329                                }
330                                catch (IOException e) {
331                                        throw new ItemStreamException("Could not write headers.  The file may be corrupt.", e);
332                                }
333                        }
334                }
335        }
336
337        /**
338         * @see ItemStream#update(ExecutionContext)
339         */
340        @Override
341        public void update(ExecutionContext executionContext) {
342                super.update(executionContext);
343                if (state == null) {
344                        throw new ItemStreamException("ItemStream not open or already closed.");
345                }
346
347                Assert.notNull(executionContext, "ExecutionContext must not be null");
348
349                if (saveState) {
350
351                        try {
352                                executionContext.putLong(getExecutionContextKey(RESTART_DATA_NAME), state.position());
353                        }
354                        catch (IOException e) {
355                                throw new ItemStreamException("ItemStream does not return current position properly", e);
356                        }
357
358                        executionContext.putLong(getExecutionContextKey(WRITTEN_STATISTICS_NAME), state.linesWritten);
359                }
360        }
361
362        // Returns object representing state.
363        protected OutputState getOutputState() {
364                if (state == null) {
365                        File file;
366                        try {
367                                file = resource.getFile();
368                        }
369                        catch (IOException e) {
370                                throw new ItemStreamException("Could not convert resource to file: [" + resource + "]", e);
371                        }
372                        Assert.state(!file.exists() || file.canWrite(), "Resource is not writable: [" + resource + "]");
373                        state = new OutputState();
374                        state.setDeleteIfExists(shouldDeleteIfExists);
375                        state.setAppendAllowed(append);
376                        state.setEncoding(encoding);
377                }
378                return state;
379        }
380
381        /**
382         * Encapsulates the runtime state of the writer. All state changing
383         * operations on the writer go through this class.
384         */
385        protected class OutputState {
386
387                private FileOutputStream os;
388
389                // The bufferedWriter over the file channel that is actually written
390                Writer outputBufferedWriter;
391
392                FileChannel fileChannel;
393
394                // this represents the charset encoding (if any is needed) for the
395                // output file
396                String encoding = DEFAULT_CHARSET;
397
398                boolean restarted = false;
399
400                long lastMarkedByteOffsetPosition = 0;
401
402                long linesWritten = 0;
403
404                boolean shouldDeleteIfExists = true;
405
406                boolean initialized = false;
407
408                private boolean append = false;
409
410                private boolean appending = false;
411
412                /**
413                 * Return the byte offset position of the cursor in the output file as a
414                 * long integer.
415                 */
416                public long position() throws IOException {
417                        long pos = 0;
418
419                        if (fileChannel == null) {
420                                return 0;
421                        }
422
423                        outputBufferedWriter.flush();
424                        pos = fileChannel.position();
425                        if (transactional) {
426                                pos += ((TransactionAwareBufferedWriter) outputBufferedWriter).getBufferSize();
427                        }
428
429                        return pos;
430
431                }
432
433                /**
434                 * @param append if true, append to previously created file
435                 */
436                public void setAppendAllowed(boolean append) {
437                        this.append = append;
438                }
439
440                /**
441                 * @param executionContext state from which to restore writing from
442                 */
443                public void restoreFrom(ExecutionContext executionContext) {
444                        lastMarkedByteOffsetPosition = executionContext.getLong(getExecutionContextKey(RESTART_DATA_NAME));
445                        linesWritten = executionContext.getLong(getExecutionContextKey(WRITTEN_STATISTICS_NAME));
446                        if (shouldDeleteIfEmpty && linesWritten == 0) {
447                                // previous execution deleted the output file because no items were written
448                                restarted = false;
449                                lastMarkedByteOffsetPosition = 0;
450                        } else {
451                                restarted = true;
452                        }
453                }
454
455                /**
456                 * @param shouldDeleteIfExists indicator
457                 */
458                public void setDeleteIfExists(boolean shouldDeleteIfExists) {
459                        this.shouldDeleteIfExists = shouldDeleteIfExists;
460                }
461
462                /**
463                 * @param encoding file encoding
464                 */
465                public void setEncoding(String encoding) {
466                        this.encoding = encoding;
467                }
468
469                public long getLinesWritten() {
470                        return linesWritten;
471                }
472
473                public void setLinesWritten(long linesWritten) {
474                        this.linesWritten = linesWritten;
475                }
476
477                /**
478                 * Close the open resource and reset counters.
479                 */
480                public void close() {
481
482                        initialized = false;
483                        restarted = false;
484                        try {
485                                if (outputBufferedWriter != null) {
486                                        outputBufferedWriter.close();
487                                }
488                        }
489                        catch (IOException ioe) {
490                                throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
491                        }
492                        finally {
493                                if (!transactional) {
494                                        closeStream();
495                                }
496                        }
497                }
498
499                private void closeStream() {
500                        try {
501                                if (fileChannel != null) {
502                                        fileChannel.close();
503                                }
504                        }
505                        catch (IOException ioe) {
506                                throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
507                        }
508                        finally {
509                                try {
510                                        if (os != null) {
511                                                os.close();
512                                        }
513                                }
514                                catch (IOException ioe) {
515                                        throw new ItemStreamException("Unable to close the the ItemWriter", ioe);
516                                }
517                        }
518                }
519
520                /**
521                 * @param line String to be written to the file
522                 * @throws IOException
523                 */
524                public void write(String line) throws IOException {
525                        if (!initialized) {
526                                initializeBufferedWriter();
527                        }
528
529                        outputBufferedWriter.write(line);
530                        outputBufferedWriter.flush();
531                }
532
533                /**
534                 * Truncate the output at the last known good point.
535                 * 
536                 * @throws IOException if unable to work with file
537                 */
538                public void truncate() throws IOException {
539                        fileChannel.truncate(lastMarkedByteOffsetPosition);
540                        fileChannel.position(lastMarkedByteOffsetPosition);
541                }
542
543                /**
544                 * Creates the buffered writer for the output file channel based on
545                 * configuration information.
546                 * @throws IOException if unable to initialize buffer
547                 */
548                private void initializeBufferedWriter() throws IOException {
549
550                        File file = resource.getFile();
551                        FileUtils.setUpOutputFile(file, restarted, append, shouldDeleteIfExists);
552
553                        os = new FileOutputStream(file.getAbsolutePath(), true);
554                        fileChannel = os.getChannel();
555
556                        outputBufferedWriter = getBufferedWriter(fileChannel, encoding);
557                        outputBufferedWriter.flush();
558
559                        if (append) {
560                                // Bug in IO library? This doesn't work...
561                                // lastMarkedByteOffsetPosition = fileChannel.position();
562                                if (file.length() > 0) {
563                                        appending = true;
564                                        // Don't write the headers again
565                                }
566                        }
567
568                        Assert.state(outputBufferedWriter != null,
569                                        "Unable to initialize buffered writer");
570                        // in case of restarting reset position to last committed point
571                        if (restarted) {
572                                checkFileSize();
573                                truncate();
574                        }
575
576                        initialized = true;
577                }
578
579                public boolean isInitialized() {
580                        return initialized;
581                }
582
583                /**
584                 * Returns the buffered writer opened to the beginning of the file
585                 * specified by the absolute path name contained in absoluteFileName.
586                 */
587                private Writer getBufferedWriter(FileChannel fileChannel, String encoding) {
588                        try {
589                                final FileChannel channel = fileChannel;
590                                if (transactional) {
591                                        TransactionAwareBufferedWriter writer = new TransactionAwareBufferedWriter(channel, () -> closeStream());
592
593                                        writer.setEncoding(encoding);
594                                        writer.setForceSync(forceSync);
595                                        return writer;
596                                }
597                                else {
598                                        Writer writer = new BufferedWriter(Channels.newWriter(fileChannel, encoding)) {
599                                                @Override
600                                                public void flush() throws IOException {
601                                                        super.flush();
602                                                        if (forceSync) {
603                                                                channel.force(false);
604                                                        }
605                                                }
606                                        };
607
608                                        return writer;
609                                }
610                        }
611                        catch (UnsupportedCharsetException ucse) {
612                                throw new ItemStreamException("Bad encoding configuration for output file " + fileChannel, ucse);
613                        }
614                }
615
616                /**
617                 * Checks (on setState) to make sure that the current output file's size
618                 * is not smaller than the last saved commit point. If it is, then the
619                 * file has been damaged in some way and whole task must be started over
620                 * again from the beginning.
621                 * @throws IOException if there is an IO problem
622                 */
623                private void checkFileSize() throws IOException {
624                        long size = -1;
625
626                        outputBufferedWriter.flush();
627                        size = fileChannel.size();
628
629                        if (size < lastMarkedByteOffsetPosition) {
630                                throw new ItemStreamException("Current file size is smaller than size at last commit");
631                        }
632                }
633
634        }
635
636}