001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.batch.item.database;
018
019import java.io.PrintWriter;
020import java.lang.reflect.InvocationHandler;
021import java.lang.reflect.InvocationTargetException;
022import java.lang.reflect.Method;
023import java.lang.reflect.Proxy;
024import java.sql.Connection;
025import java.sql.SQLException;
026import java.sql.SQLFeatureNotSupportedException;
027import java.util.logging.Logger;
028
029import javax.sql.DataSource;
030
031import org.springframework.beans.factory.InitializingBean;
032import org.springframework.jdbc.datasource.ConnectionProxy;
033import org.springframework.jdbc.datasource.DataSourceUtils;
034import org.springframework.jdbc.datasource.SmartDataSource;
035import org.springframework.transaction.support.TransactionSynchronizationManager;
036import org.springframework.util.Assert;
037import org.springframework.util.MethodInvoker;
038
039/**
040 * Implementation of {@link SmartDataSource} that is capable of keeping a single
041 * JDBC Connection which is NOT closed after each use even if
042 * {@link Connection#close()} is called.
043 *
044 * The connection can be kept open over multiple transactions when used together
045 * with any of Spring's
046 * {@link org.springframework.transaction.PlatformTransactionManager}
047 * implementations.
048 *
049 * <p>
050 * Loosely based on the SingleConnectionDataSource implementation in Spring
051 * Core. Intended to be used with the {@link JdbcCursorItemReader} to provide a
052 * connection that remains open across transaction boundaries, It remains open
053 * for the life of the cursor, and can be shared with the main transaction of
054 * the rest of the step processing.
055 *
056 * <p>
057 * Once close suppression has been turned on for a connection, it will be
058 * returned for the first {@link #getConnection()} call. Any subsequent calls to
059 * {@link #getConnection()} will retrieve a new connection from the wrapped
060 * {@link DataSource} until the {@link DataSourceUtils} queries whether the
061 * connection should be closed or not by calling
062 * {@link #shouldClose(Connection)} for the close-suppressed {@link Connection}.
063 * At that point the cycle starts over again, and the next
064 * {@link #getConnection()} call will have the {@link Connection} that is being
065 * close-suppressed returned. This allows the use of the close-suppressed
066 * {@link Connection} to be the main {@link Connection} for an extended data
067 * access process. The close suppression is turned off by calling
068 * {@link #stopCloseSuppression(Connection)}.
069 *
070 * <p>
071 * This class is not multi-threading capable.
072 *
073 * <p>
074 * The connection returned will be a close-suppressing proxy instead of the
075 * physical {@link Connection}. Be aware that you will not be able to cast this
076 * to a native <code>OracleConnection</code> or the like anymore; you'd be required to use
077 * {@link java.sql.Connection#unwrap(Class)}.
078 *
079 * @author Thomas Risberg
080 * @see #getConnection()
081 * @see java.sql.Connection#close()
082 * @see DataSourceUtils#releaseConnection
083 * @see java.sql.Connection#unwrap(Class)
084 * @since 2.0
085 */
086public class ExtendedConnectionDataSourceProxy implements SmartDataSource, InitializingBean {
087
088        /** Provided DataSource */
089        private DataSource dataSource;
090
091        /** The connection to suppress close calls for */
092        private Connection closeSuppressedConnection = null;
093
094        /** The connection to suppress close calls for */
095        private boolean borrowedConnection = false;
096
097        /** Synchronization monitor for the shared Connection */
098        private final Object connectionMonitor = new Object();
099
100        /**
101         * No arg constructor for use when configured using JavaBean style.
102         */
103        public ExtendedConnectionDataSourceProxy() {
104        }
105
106        /**
107         * Constructor that takes as a parameter with the {@link DataSource} to be
108         * wrapped.
109         *
110         * @param dataSource DataSource to be used
111         */
112        public ExtendedConnectionDataSourceProxy(DataSource dataSource) {
113                this.dataSource = dataSource;
114        }
115
116        /**
117         * Setter for the {@link DataSource} that is to be wrapped.
118         *
119         * @param dataSource the DataSource
120         */
121        public void setDataSource(DataSource dataSource) {
122                this.dataSource = dataSource;
123        }
124
125        /**
126         * @see SmartDataSource
127         */
128        @Override
129        public boolean shouldClose(Connection connection) {
130                boolean shouldClose = !isCloseSuppressionActive(connection);
131                if (borrowedConnection && closeSuppressedConnection.equals(connection)) {
132                        borrowedConnection = false;
133                }
134                return shouldClose;
135        }
136
137        /**
138         * Return the status of close suppression being activated for a given
139         * {@link Connection}
140         *
141         * @param connection the {@link Connection} that the close suppression
142         * status is requested for
143         * @return true or false
144         */
145        public boolean isCloseSuppressionActive(Connection connection) {
146                return connection != null && connection.equals(closeSuppressedConnection);
147        }
148
149        /**
150         *
151         * @param connection the {@link Connection} that close suppression is
152         * requested for
153         */
154        public void startCloseSuppression(Connection connection) {
155                synchronized (this.connectionMonitor) {
156                        closeSuppressedConnection = connection;
157                        if (TransactionSynchronizationManager.isActualTransactionActive()) {
158                                borrowedConnection = true;
159                        }
160                }
161        }
162
163        /**
164         *
165         * @param connection the {@link Connection} that close suppression should be
166         * turned off for
167         */
168        public void stopCloseSuppression(Connection connection) {
169                synchronized (this.connectionMonitor) {
170                        closeSuppressedConnection = null;
171                        borrowedConnection = false;
172                }
173        }
174
175        @Override
176        public Connection getConnection() throws SQLException {
177                synchronized (this.connectionMonitor) {
178                        return initConnection(null, null);
179                }
180        }
181
182        @Override
183        public Connection getConnection(String username, String password) throws SQLException {
184                synchronized (this.connectionMonitor) {
185                        return initConnection(username, password);
186                }
187        }
188
189        private boolean completeCloseCall(Connection connection) {
190                if (borrowedConnection && closeSuppressedConnection.equals(connection)) {
191                        borrowedConnection = false;
192                }
193                return isCloseSuppressionActive(connection);
194        }
195
196        private Connection initConnection(String username, String password) throws SQLException {
197                if (closeSuppressedConnection != null) {
198                        if (!borrowedConnection) {
199                                borrowedConnection = true;
200                                return closeSuppressedConnection;
201                        }
202                }
203                Connection target;
204                if (username != null) {
205                        target = dataSource.getConnection(username, password);
206                }
207                else {
208                        target = dataSource.getConnection();
209                }
210
211                return getCloseSuppressingConnectionProxy(target);
212        }
213
214        @Override
215        public PrintWriter getLogWriter() throws SQLException {
216                return dataSource.getLogWriter();
217        }
218
219        @Override
220        public int getLoginTimeout() throws SQLException {
221                return dataSource.getLoginTimeout();
222        }
223
224        @Override
225        public void setLogWriter(PrintWriter out) throws SQLException {
226                dataSource.setLogWriter(out);
227        }
228
229        @Override
230        public void setLoginTimeout(int seconds) throws SQLException {
231                dataSource.setLoginTimeout(seconds);
232        }
233
234        /**
235         * Wrap the given Connection with a proxy that delegates every method call
236         * to it but suppresses close calls.
237         * @param target the original Connection to wrap
238         * @return the wrapped Connection
239         */
240        @SuppressWarnings("rawtypes")
241        protected Connection getCloseSuppressingConnectionProxy(Connection target) {
242                return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
243                                new Class[] { ConnectionProxy.class }, new CloseSuppressingInvocationHandler(target, this));
244        }
245
246        /**
247         * Invocation handler that suppresses close calls on JDBC Connections until
248         * the associated instance of the ExtendedConnectionDataSourceProxy
249         * determines the connection should actually be closed.
250         */
251        private static class CloseSuppressingInvocationHandler implements InvocationHandler {
252
253                private final Connection target;
254
255                private final ExtendedConnectionDataSourceProxy dataSource;
256
257                public CloseSuppressingInvocationHandler(Connection target, ExtendedConnectionDataSourceProxy dataSource) {
258                        this.dataSource = dataSource;
259                        this.target = target;
260                }
261
262                @Override
263                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
264                        // Invocation on ConnectionProxy interface coming in...
265
266                        switch (method.getName()) {
267                                case "equals":
268                                        // Only consider equal when proxies are identical.
269                                        return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
270                                case "hashCode":
271                                        // Use hashCode of Connection proxy.
272                                        return System.identityHashCode(proxy);
273                                case "close":
274                                        // Handle close method: don't pass the call on if we are
275                                        // suppressing close calls.
276                                        if (dataSource.completeCloseCall((Connection) proxy)) {
277                                                return null;
278                                        }
279                                        else {
280                                                target.close();
281                                                return null;
282                                        }
283                                case "getTargetConnection":
284                                        // Handle getTargetConnection method: return underlying
285                                        // Connection.
286                                        return this.target;
287                        }
288
289                        // Invoke method on target Connection.
290                        try {
291                                return method.invoke(this.target, args);
292                        }
293                        catch (InvocationTargetException ex) {
294                                throw ex.getTargetException();
295                        }
296                }
297        }
298
299        /**
300         * Performs only a 'shallow' non-recursive check of self's and delegate's
301         * class to retain Java 5 compatibility.
302         */
303        @Override
304        public boolean isWrapperFor(Class<?> iface) throws SQLException {
305                return iface.isAssignableFrom(SmartDataSource.class) || iface.isAssignableFrom(dataSource.getClass());
306        }
307
308        /**
309         * Returns either self or delegate (in this order) if one of them can be
310         * cast to supplied parameter class. Does *not* support recursive unwrapping
311         * of the delegate to retain Java 5 compatibility.
312         */
313        @Override
314        public <T> T unwrap(Class<T> iface) throws SQLException {
315                if (iface.isAssignableFrom(SmartDataSource.class)) {
316                        @SuppressWarnings("unchecked")
317                        T casted = (T) this;
318                        return casted;
319                }
320                else if (iface.isAssignableFrom(dataSource.getClass())) {
321                        @SuppressWarnings("unchecked")
322                        T casted = (T) dataSource;
323                        return casted;
324                }
325                throw new SQLException("Unsupported class " + iface.getSimpleName());
326        }
327
328        @Override
329        public void afterPropertiesSet() throws Exception {
330                Assert.notNull(dataSource, "DataSource is required");
331        }
332
333        /**
334         * Added due to JDK 7 compatibility.
335         */
336        public Logger getParentLogger() throws SQLFeatureNotSupportedException{
337                MethodInvoker invoker = new MethodInvoker();
338                invoker.setTargetObject(dataSource);
339                invoker.setTargetMethod("getParentLogger");
340
341                try {
342                        invoker.prepare();
343                        return (Logger) invoker.invoke();
344                } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException nsme) {
345                        throw new SQLFeatureNotSupportedException(nsme);
346                }
347        }
348}