001/* 002 * Copyright 2006-2012 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.springframework.batch.support.transaction; 017 018import java.io.IOException; 019import java.io.UnsupportedEncodingException; 020import java.io.Writer; 021import java.nio.ByteBuffer; 022import java.nio.channels.FileChannel; 023 024import org.springframework.batch.item.WriteFailedException; 025import org.springframework.transaction.support.TransactionSynchronizationAdapter; 026import org.springframework.transaction.support.TransactionSynchronizationManager; 027 028/** 029 * Wrapper for a {@link FileChannel} that delays actually writing to or closing the 030 * buffer if a transaction is active. If a transaction is detected on the call 031 * to {@link #write(String)} the parameter is buffered and passed on to the 032 * underlying writer only when the transaction is committed. 033 * 034 * @author Dave Syer 035 * @author Michael Minella 036 * 037 */ 038public class TransactionAwareBufferedWriter extends Writer { 039 040 private final Object bufferKey; 041 042 private final Object closeKey; 043 044 private FileChannel channel; 045 046 private final Runnable closeCallback; 047 048 // default encoding for writing to output files - set to UTF-8. 049 private static final String DEFAULT_CHARSET = "UTF-8"; 050 051 private String encoding = DEFAULT_CHARSET; 052 053 private boolean forceSync = false; 054 055 /** 056 * Create a new instance with the underlying file channel provided, and a callback 057 * to execute on close. The callback should clean up related resources like 058 * output streams or channels. 059 * 060 * @param channel channel used to do the actual file IO 061 * @param closeCallback callback to execute on close 062 */ 063 public TransactionAwareBufferedWriter(FileChannel channel, Runnable closeCallback) { 064 super(); 065 this.channel = channel; 066 this.closeCallback = closeCallback; 067 this.bufferKey = new Object(); 068 this.closeKey = new Object(); 069 } 070 071 public void setEncoding(String encoding) { 072 this.encoding = encoding; 073 } 074 075 /** 076 * Flag to indicate that changes should be force-synced to disk on flush. 077 * Defaults to false, which means that even with a local disk changes could 078 * be lost if the OS crashes in between a write and a cache flush. Setting 079 * to true may result in slower performance for usage patterns involving 080 * many frequent writes. 081 * 082 * @param forceSync the flag value to set 083 */ 084 public void setForceSync(boolean forceSync) { 085 this.forceSync = forceSync; 086 } 087 088 /** 089 * @return 090 */ 091 private StringBuilder getCurrentBuffer() { 092 093 if (!TransactionSynchronizationManager.hasResource(bufferKey)) { 094 095 TransactionSynchronizationManager.bindResource(bufferKey, new StringBuilder()); 096 097 TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { 098 @Override 099 public void afterCompletion(int status) { 100 clear(); 101 } 102 103 @Override 104 public void beforeCommit(boolean readOnly) { 105 try { 106 if(!readOnly) { 107 complete(); 108 } 109 } 110 catch (IOException e) { 111 throw new FlushFailedException("Could not write to output buffer", e); 112 } 113 } 114 115 private void complete() throws IOException { 116 StringBuilder buffer = (StringBuilder) TransactionSynchronizationManager.getResource(bufferKey); 117 if (buffer != null) { 118 String string = buffer.toString(); 119 byte[] bytes = string.getBytes(encoding); 120 int bufferLength = bytes.length; 121 ByteBuffer bb = ByteBuffer.wrap(bytes); 122 int bytesWritten = channel.write(bb); 123 if(bytesWritten != bufferLength) { 124 throw new IOException("All bytes to be written were not successfully written"); 125 } 126 if (forceSync) { 127 channel.force(false); 128 } 129 if (TransactionSynchronizationManager.hasResource(closeKey)) { 130 closeCallback.run(); 131 } 132 } 133 } 134 135 private void clear() { 136 if (TransactionSynchronizationManager.hasResource(bufferKey)) { 137 TransactionSynchronizationManager.unbindResource(bufferKey); 138 } 139 if (TransactionSynchronizationManager.hasResource(closeKey)) { 140 TransactionSynchronizationManager.unbindResource(closeKey); 141 } 142 } 143 144 }); 145 146 } 147 148 return (StringBuilder) TransactionSynchronizationManager.getResource(bufferKey); 149 150 } 151 152 /** 153 * Convenience method for clients to determine if there is any unflushed 154 * data. 155 * 156 * @return the current size (in bytes) of unflushed buffered data 157 */ 158 public long getBufferSize() { 159 if (!transactionActive()) { 160 return 0L; 161 } 162 try { 163 return getCurrentBuffer().toString().getBytes(encoding).length; 164 } catch (UnsupportedEncodingException e) { 165 throw new WriteFailedException("Could not determine buffer size because of unsupported encoding: " + encoding, e); 166 } 167 } 168 169 /** 170 * @return 171 */ 172 private boolean transactionActive() { 173 return TransactionSynchronizationManager.isActualTransactionActive(); 174 } 175 176 /* 177 * (non-Javadoc) 178 * 179 * @see java.io.Writer#close() 180 */ 181 @Override 182 public void close() throws IOException { 183 if (transactionActive()) { 184 if (getCurrentBuffer().length() > 0) { 185 TransactionSynchronizationManager.bindResource(closeKey, Boolean.TRUE); 186 } 187 return; 188 } 189 closeCallback.run(); 190 } 191 192 /* 193 * (non-Javadoc) 194 * 195 * @see java.io.Writer#flush() 196 */ 197 @Override 198 public void flush() throws IOException { 199 if (!transactionActive() && forceSync) { 200 channel.force(false); 201 } 202 } 203 204 /* 205 * (non-Javadoc) 206 * 207 * @see java.io.Writer#write(char[], int, int) 208 */ 209 @Override 210 public void write(char[] cbuf, int off, int len) throws IOException { 211 212 if (!transactionActive()) { 213 char [] subArray = new char[len]; 214 System.arraycopy(cbuf, off, subArray, 0, len); 215 byte[] bytes = new String(subArray).getBytes(encoding); 216 int length = bytes.length; 217 ByteBuffer bb = ByteBuffer.wrap(bytes); 218 int bytesWritten = channel.write(bb); 219 if(bytesWritten != length) { 220 throw new IOException("Unable to write all data. Bytes to write: " + len + ". Bytes written: " + bytesWritten); 221 } 222 return; 223 } 224 225 StringBuilder buffer = getCurrentBuffer(); 226 buffer.append(cbuf, off, len); 227 } 228}