001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.batch.item.database; 018 019import java.io.PrintWriter; 020import java.lang.reflect.InvocationHandler; 021import java.lang.reflect.InvocationTargetException; 022import java.lang.reflect.Method; 023import java.lang.reflect.Proxy; 024import java.sql.Connection; 025import java.sql.SQLException; 026import java.sql.SQLFeatureNotSupportedException; 027import java.util.logging.Logger; 028 029import javax.sql.DataSource; 030 031import org.springframework.beans.factory.InitializingBean; 032import org.springframework.jdbc.datasource.ConnectionProxy; 033import org.springframework.jdbc.datasource.DataSourceUtils; 034import org.springframework.jdbc.datasource.SmartDataSource; 035import org.springframework.transaction.support.TransactionSynchronizationManager; 036import org.springframework.util.Assert; 037import org.springframework.util.MethodInvoker; 038 039/** 040 * Implementation of {@link SmartDataSource} that is capable of keeping a single 041 * JDBC Connection which is NOT closed after each use even if 042 * {@link Connection#close()} is called. 043 * 044 * The connection can be kept open over multiple transactions when used together 045 * with any of Spring's 046 * {@link org.springframework.transaction.PlatformTransactionManager} 047 * implementations. 048 * 049 * <p> 050 * Loosely based on the SingleConnectionDataSource implementation in Spring 051 * Core. Intended to be used with the {@link JdbcCursorItemReader} to provide a 052 * connection that remains open across transaction boundaries, It remains open 053 * for the life of the cursor, and can be shared with the main transaction of 054 * the rest of the step processing. 055 * 056 * <p> 057 * Once close suppression has been turned on for a connection, it will be 058 * returned for the first {@link #getConnection()} call. Any subsequent calls to 059 * {@link #getConnection()} will retrieve a new connection from the wrapped 060 * {@link DataSource} until the {@link DataSourceUtils} queries whether the 061 * connection should be closed or not by calling 062 * {@link #shouldClose(Connection)} for the close-suppressed {@link Connection}. 063 * At that point the cycle starts over again, and the next 064 * {@link #getConnection()} call will have the {@link Connection} that is being 065 * close-suppressed returned. This allows the use of the close-suppressed 066 * {@link Connection} to be the main {@link Connection} for an extended data 067 * access process. The close suppression is turned off by calling 068 * {@link #stopCloseSuppression(Connection)}. 069 * 070 * <p> 071 * This class is not multi-threading capable. 072 * 073 * <p> 074 * The connection returned will be a close-suppressing proxy instead of the 075 * physical {@link Connection}. Be aware that you will not be able to cast this 076 * to a native <code>OracleConnection</code> or the like anymore; you'd be required to use 077 * {@link java.sql.Connection#unwrap(Class)}. 078 * 079 * @author Thomas Risberg 080 * @see #getConnection() 081 * @see java.sql.Connection#close() 082 * @see DataSourceUtils#releaseConnection 083 * @see java.sql.Connection#unwrap(Class) 084 * @since 2.0 085 */ 086public class ExtendedConnectionDataSourceProxy implements SmartDataSource, InitializingBean { 087 088 /** Provided DataSource */ 089 private DataSource dataSource; 090 091 /** The connection to suppress close calls for */ 092 private Connection closeSuppressedConnection = null; 093 094 /** The connection to suppress close calls for */ 095 private boolean borrowedConnection = false; 096 097 /** Synchronization monitor for the shared Connection */ 098 private final Object connectionMonitor = new Object(); 099 100 /** 101 * No arg constructor for use when configured using JavaBean style. 102 */ 103 public ExtendedConnectionDataSourceProxy() { 104 } 105 106 /** 107 * Constructor that takes as a parameter with the {@link DataSource} to be 108 * wrapped. 109 * 110 * @param dataSource DataSource to be used 111 */ 112 public ExtendedConnectionDataSourceProxy(DataSource dataSource) { 113 this.dataSource = dataSource; 114 } 115 116 /** 117 * Setter for the {@link DataSource} that is to be wrapped. 118 * 119 * @param dataSource the DataSource 120 */ 121 public void setDataSource(DataSource dataSource) { 122 this.dataSource = dataSource; 123 } 124 125 /** 126 * @see SmartDataSource 127 */ 128 @Override 129 public boolean shouldClose(Connection connection) { 130 boolean shouldClose = !isCloseSuppressionActive(connection); 131 if (borrowedConnection && closeSuppressedConnection.equals(connection)) { 132 borrowedConnection = false; 133 } 134 return shouldClose; 135 } 136 137 /** 138 * Return the status of close suppression being activated for a given 139 * {@link Connection} 140 * 141 * @param connection the {@link Connection} that the close suppression 142 * status is requested for 143 * @return true or false 144 */ 145 public boolean isCloseSuppressionActive(Connection connection) { 146 return connection != null && connection.equals(closeSuppressedConnection); 147 } 148 149 /** 150 * 151 * @param connection the {@link Connection} that close suppression is 152 * requested for 153 */ 154 public void startCloseSuppression(Connection connection) { 155 synchronized (this.connectionMonitor) { 156 closeSuppressedConnection = connection; 157 if (TransactionSynchronizationManager.isActualTransactionActive()) { 158 borrowedConnection = true; 159 } 160 } 161 } 162 163 /** 164 * 165 * @param connection the {@link Connection} that close suppression should be 166 * turned off for 167 */ 168 public void stopCloseSuppression(Connection connection) { 169 synchronized (this.connectionMonitor) { 170 closeSuppressedConnection = null; 171 borrowedConnection = false; 172 } 173 } 174 175 @Override 176 public Connection getConnection() throws SQLException { 177 synchronized (this.connectionMonitor) { 178 return initConnection(null, null); 179 } 180 } 181 182 @Override 183 public Connection getConnection(String username, String password) throws SQLException { 184 synchronized (this.connectionMonitor) { 185 return initConnection(username, password); 186 } 187 } 188 189 private boolean completeCloseCall(Connection connection) { 190 if (borrowedConnection && closeSuppressedConnection.equals(connection)) { 191 borrowedConnection = false; 192 } 193 return isCloseSuppressionActive(connection); 194 } 195 196 private Connection initConnection(String username, String password) throws SQLException { 197 if (closeSuppressedConnection != null) { 198 if (!borrowedConnection) { 199 borrowedConnection = true; 200 return closeSuppressedConnection; 201 } 202 } 203 Connection target; 204 if (username != null) { 205 target = dataSource.getConnection(username, password); 206 } 207 else { 208 target = dataSource.getConnection(); 209 } 210 211 return getCloseSuppressingConnectionProxy(target); 212 } 213 214 @Override 215 public PrintWriter getLogWriter() throws SQLException { 216 return dataSource.getLogWriter(); 217 } 218 219 @Override 220 public int getLoginTimeout() throws SQLException { 221 return dataSource.getLoginTimeout(); 222 } 223 224 @Override 225 public void setLogWriter(PrintWriter out) throws SQLException { 226 dataSource.setLogWriter(out); 227 } 228 229 @Override 230 public void setLoginTimeout(int seconds) throws SQLException { 231 dataSource.setLoginTimeout(seconds); 232 } 233 234 /** 235 * Wrap the given Connection with a proxy that delegates every method call 236 * to it but suppresses close calls. 237 * @param target the original Connection to wrap 238 * @return the wrapped Connection 239 */ 240 @SuppressWarnings("rawtypes") 241 protected Connection getCloseSuppressingConnectionProxy(Connection target) { 242 return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(), 243 new Class[] { ConnectionProxy.class }, new CloseSuppressingInvocationHandler(target, this)); 244 } 245 246 /** 247 * Invocation handler that suppresses close calls on JDBC Connections until 248 * the associated instance of the ExtendedConnectionDataSourceProxy 249 * determines the connection should actually be closed. 250 */ 251 private static class CloseSuppressingInvocationHandler implements InvocationHandler { 252 253 private final Connection target; 254 255 private final ExtendedConnectionDataSourceProxy dataSource; 256 257 public CloseSuppressingInvocationHandler(Connection target, ExtendedConnectionDataSourceProxy dataSource) { 258 this.dataSource = dataSource; 259 this.target = target; 260 } 261 262 @Override 263 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 264 // Invocation on ConnectionProxy interface coming in... 265 266 switch (method.getName()) { 267 case "equals": 268 // Only consider equal when proxies are identical. 269 return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE); 270 case "hashCode": 271 // Use hashCode of Connection proxy. 272 return System.identityHashCode(proxy); 273 case "close": 274 // Handle close method: don't pass the call on if we are 275 // suppressing close calls. 276 if (dataSource.completeCloseCall((Connection) proxy)) { 277 return null; 278 } 279 else { 280 target.close(); 281 return null; 282 } 283 case "getTargetConnection": 284 // Handle getTargetConnection method: return underlying 285 // Connection. 286 return this.target; 287 } 288 289 // Invoke method on target Connection. 290 try { 291 return method.invoke(this.target, args); 292 } 293 catch (InvocationTargetException ex) { 294 throw ex.getTargetException(); 295 } 296 } 297 } 298 299 /** 300 * Performs only a 'shallow' non-recursive check of self's and delegate's 301 * class to retain Java 5 compatibility. 302 */ 303 @Override 304 public boolean isWrapperFor(Class<?> iface) throws SQLException { 305 return iface.isAssignableFrom(SmartDataSource.class) || iface.isAssignableFrom(dataSource.getClass()); 306 } 307 308 /** 309 * Returns either self or delegate (in this order) if one of them can be 310 * cast to supplied parameter class. Does *not* support recursive unwrapping 311 * of the delegate to retain Java 5 compatibility. 312 */ 313 @Override 314 public <T> T unwrap(Class<T> iface) throws SQLException { 315 if (iface.isAssignableFrom(SmartDataSource.class)) { 316 @SuppressWarnings("unchecked") 317 T casted = (T) this; 318 return casted; 319 } 320 else if (iface.isAssignableFrom(dataSource.getClass())) { 321 @SuppressWarnings("unchecked") 322 T casted = (T) dataSource; 323 return casted; 324 } 325 throw new SQLException("Unsupported class " + iface.getSimpleName()); 326 } 327 328 @Override 329 public void afterPropertiesSet() throws Exception { 330 Assert.notNull(dataSource, "DataSource is required"); 331 } 332 333 /** 334 * Added due to JDK 7 compatibility. 335 */ 336 public Logger getParentLogger() throws SQLFeatureNotSupportedException{ 337 MethodInvoker invoker = new MethodInvoker(); 338 invoker.setTargetObject(dataSource); 339 invoker.setTargetMethod("getParentLogger"); 340 341 try { 342 invoker.prepare(); 343 return (Logger) invoker.invoke(); 344 } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException nsme) { 345 throw new SQLFeatureNotSupportedException(nsme); 346 } 347 } 348}