001/*
002 * Copyright 2006-2012 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package org.springframework.batch.support.transaction;
017
018import java.io.IOException;
019import java.io.UnsupportedEncodingException;
020import java.io.Writer;
021import java.nio.ByteBuffer;
022import java.nio.channels.FileChannel;
023
024import org.springframework.batch.item.WriteFailedException;
025import org.springframework.transaction.support.TransactionSynchronizationAdapter;
026import org.springframework.transaction.support.TransactionSynchronizationManager;
027
028/**
029 * Wrapper for a {@link FileChannel} that delays actually writing to or closing the
030 * buffer if a transaction is active. If a transaction is detected on the call
031 * to {@link #write(String)} the parameter is buffered and passed on to the
032 * underlying writer only when the transaction is committed.
033 *
034 * @author Dave Syer
035 * @author Michael Minella
036 *
037 */
038public class TransactionAwareBufferedWriter extends Writer {
039
040        private final Object bufferKey;
041
042        private final Object closeKey;
043
044        private FileChannel channel;
045
046        private final Runnable closeCallback;
047
048        // default encoding for writing to output files - set to UTF-8.
049        private static final String DEFAULT_CHARSET = "UTF-8";
050
051        private String encoding = DEFAULT_CHARSET;
052
053        private boolean forceSync = false;
054
055        /**
056         * Create a new instance with the underlying file channel provided, and a callback
057         * to execute on close. The callback should clean up related resources like
058         * output streams or channels.
059         *
060         * @param channel channel used to do the actual file IO
061         * @param closeCallback callback to execute on close
062         */
063        public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) {
064                super();
065                this.channel = channel;
066                this.closeCallback = closeCallback;
067                this.bufferKey = new Object();
068                this.closeKey = new Object();
069        }
070
071        public void setEncoding(String encoding) {
072                this.encoding = encoding;
073        }
074
075        /**
076         * Flag to indicate that changes should be force-synced to disk on flush.
077         * Defaults to false, which means that even with a local disk changes could
078         * be lost if the OS crashes in between a write and a cache flush. Setting
079         * to true may result in slower performance for usage patterns involving
080         * many frequent writes.
081         *
082         * @param forceSync the flag value to set
083         */
084        public void setForceSync(boolean forceSync) {
085                this.forceSync = forceSync;
086        }
087
088        /**
089         * @return
090         */
091        private StringBuilder getCurrentBuffer() {
092
093                if (!TransactionSynchronizationManager.hasResource(bufferKey)) {
094
095                        TransactionSynchronizationManager.bindResource(bufferKey, new StringBuilder());
096
097                        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
098                                @Override
099                                public void afterCompletion(int status) {
100                                        clear();
101                                }
102
103                                @Override
104                                public void beforeCommit(boolean readOnly) {
105                                        try {
106                                                if(!readOnly) {
107                                                        complete();
108                                                }
109                                        }
110                                        catch (IOException e) {
111                                                throw new FlushFailedException("Could not write to output buffer", e);
112                                        }
113                                }
114
115                                private void complete() throws IOException {
116                                        StringBuilder buffer = (StringBuilder) TransactionSynchronizationManager.getResource(bufferKey);
117                                        if (buffer != null) {
118                                                String string = buffer.toString();
119                                                byte[] bytes = string.getBytes(encoding);
120                                                int bufferLength = bytes.length;
121                                                ByteBuffer bb = ByteBuffer.wrap(bytes);
122                                                int bytesWritten = channel.write(bb);
123                                                if(bytesWritten != bufferLength) {
124                                                        throw new IOException("All bytes to be written were not successfully written");
125                                                }
126                                                if (forceSync) {
127                                                        channel.force(false);
128                                                }
129                                                if (TransactionSynchronizationManager.hasResource(closeKey)) {
130                                                        closeCallback.run();
131                                                }
132                                        }
133                                }
134
135                                private void clear() {
136                                        if (TransactionSynchronizationManager.hasResource(bufferKey)) {
137                                                TransactionSynchronizationManager.unbindResource(bufferKey);
138                                        }
139                                        if (TransactionSynchronizationManager.hasResource(closeKey)) {
140                                                TransactionSynchronizationManager.unbindResource(closeKey);
141                                        }
142                                }
143
144                        });
145
146                }
147
148                return (StringBuilder) TransactionSynchronizationManager.getResource(bufferKey);
149
150        }
151
152        /**
153         * Convenience method for clients to determine if there is any unflushed
154         * data.
155         *
156         * @return the current size (in bytes) of unflushed buffered data
157         */
158        public long getBufferSize() {
159                if (!transactionActive()) {
160                        return 0L;
161                }
162                try {
163                        return getCurrentBuffer().toString().getBytes(encoding).length;
164                } catch (UnsupportedEncodingException e) {
165                        throw new WriteFailedException("Could not determine buffer size because of unsupported encoding: " + encoding, e);
166                }
167        }
168
169        /**
170         * @return
171         */
172        private boolean transactionActive() {
173                return TransactionSynchronizationManager.isActualTransactionActive();
174        }
175
176        /*
177         * (non-Javadoc)
178         *
179         * @see java.io.Writer#close()
180         */
181        @Override
182        public void close() throws IOException {
183                if (transactionActive()) {
184                        if (getCurrentBuffer().length() > 0) {
185                                TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE);
186                        }
187                        return;
188                }
189                closeCallback.run();
190        }
191
192        /*
193         * (non-Javadoc)
194         *
195         * @see java.io.Writer#flush()
196         */
197        @Override
198        public void flush() throws IOException {
199                if (!transactionActive() && forceSync) {
200                        channel.force(false);
201                }
202        }
203
204        /*
205         * (non-Javadoc)
206         *
207         * @see java.io.Writer#write(char[], int, int)
208         */
209        @Override
210        public void write(char[] cbuf, int off, int len) throws IOException {
211
212                if (!transactionActive()) {
213                        char [] subArray = new char[len];
214                        System.arraycopy(cbuf, off, subArray, 0, len);
215                        byte[] bytes = new String(subArray).getBytes(encoding);
216                        int length = bytes.length;
217                        ByteBuffer bb = ByteBuffer.wrap(bytes);
218                        int bytesWritten = channel.write(bb);
219                        if(bytesWritten != length) {
220                                throw new IOException("Unable to write all data.  Bytes to write: " + len + ".  Bytes written: " + bytesWritten);
221                        }
222                        return;
223                }
224
225                StringBuilder buffer = getCurrentBuffer();
226                buffer.append(cbuf, off, len);
227        }
228}