001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import java.lang.reflect.InvocationHandler;
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.lang.reflect.Proxy;
023import java.util.ArrayList;
024import java.util.List;
025import javax.jms.Connection;
026import javax.jms.ConnectionFactory;
027import javax.jms.JMSException;
028import javax.jms.QueueConnection;
029import javax.jms.QueueConnectionFactory;
030import javax.jms.QueueSession;
031import javax.jms.Session;
032import javax.jms.TopicConnection;
033import javax.jms.TopicConnectionFactory;
034import javax.jms.TopicSession;
035import javax.jms.TransactionInProgressException;
036
037import org.springframework.util.Assert;
038import org.springframework.util.ClassUtils;
039
040/**
041 * Proxy for a target JMS {@link javax.jms.ConnectionFactory}, adding awareness of
042 * Spring-managed transactions. Similar to a transactional JNDI ConnectionFactory
043 * as provided by a Java EE application server.
044 *
045 * <p>Messaging code which should remain unaware of Spring's JMS support can work with
046 * this proxy to seamlessly participate in Spring-managed transactions. Note that the
047 * transaction manager, for example {@link JmsTransactionManager}, still needs to work
048 * with the underlying ConnectionFactory, <i>not</i> with this proxy.
049 *
050 * <p><b>Make sure that TransactionAwareConnectionFactoryProxy is the outermost
051 * ConnectionFactory of a chain of ConnectionFactory proxies/adapters.</b>
052 * TransactionAwareConnectionFactoryProxy can delegate either directly to the
053 * target factory or to some intermediary adapter like
054 * {@link UserCredentialsConnectionFactoryAdapter}.
055 *
056 * <p>Delegates to {@link ConnectionFactoryUtils} for automatically participating
057 * in thread-bound transactions, for example managed by {@link JmsTransactionManager}.
058 * {@code createSession} calls and {@code close} calls on returned Sessions
059 * will behave properly within a transaction, that is, always work on the transactional
060 * Session. If not within a transaction, normal ConnectionFactory behavior applies.
061 *
062 * <p>Note that transactional JMS Sessions will be registered on a per-Connection
063 * basis. To share the same JMS Session across a transaction, make sure that you
064 * operate on the same JMS Connection handle - either through reusing the handle
065 * or through configuring a {@link SingleConnectionFactory} underneath.
066 *
067 * <p>Returned transactional Session proxies will implement the {@link SessionProxy}
068 * interface to allow for access to the underlying target Session. This is only
069 * intended for accessing vendor-specific Session API or for testing purposes
070 * (e.g. to perform manual transaction control). For typical application purposes,
071 * simply use the standard JMS Session interface.
072 *
073 * @author Juergen Hoeller
074 * @since 2.0
075 * @see UserCredentialsConnectionFactoryAdapter
076 * @see SingleConnectionFactory
077 */
078public class TransactionAwareConnectionFactoryProxy
079                implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
080
081        private ConnectionFactory targetConnectionFactory;
082
083        private boolean synchedLocalTransactionAllowed = false;
084
085
086        /**
087         * Create a new TransactionAwareConnectionFactoryProxy.
088         */
089        public TransactionAwareConnectionFactoryProxy() {
090        }
091
092        /**
093         * Create a new TransactionAwareConnectionFactoryProxy.
094         * @param targetConnectionFactory the target ConnectionFactory
095         */
096        public TransactionAwareConnectionFactoryProxy(ConnectionFactory targetConnectionFactory) {
097                setTargetConnectionFactory(targetConnectionFactory);
098        }
099
100
101        /**
102         * Set the target ConnectionFactory that this ConnectionFactory should delegate to.
103         */
104        public final void setTargetConnectionFactory(ConnectionFactory targetConnectionFactory) {
105                Assert.notNull(targetConnectionFactory, "'targetConnectionFactory' must not be null");
106                this.targetConnectionFactory = targetConnectionFactory;
107        }
108
109        /**
110         * Return the target ConnectionFactory that this ConnectionFactory should delegate to.
111         */
112        protected ConnectionFactory getTargetConnectionFactory() {
113                return this.targetConnectionFactory;
114        }
115
116        /**
117         * Set whether to allow for a local JMS transaction that is synchronized with a
118         * Spring-managed transaction (where the main transaction might be a JDBC-based
119         * one for a specific DataSource, for example), with the JMS transaction committing
120         * right after the main transaction. If not allowed, the given ConnectionFactory
121         * needs to handle transaction enlistment underneath the covers.
122         * <p>Default is "false": If not within a managed transaction that encompasses
123         * the underlying JMS ConnectionFactory, standard Sessions will be returned.
124         * Turn this flag on to allow participation in any Spring-managed transaction,
125         * with a local JMS transaction synchronized with the main transaction.
126         */
127        public void setSynchedLocalTransactionAllowed(boolean synchedLocalTransactionAllowed) {
128                this.synchedLocalTransactionAllowed = synchedLocalTransactionAllowed;
129        }
130
131        /**
132         * Return whether to allow for a local JMS transaction that is synchronized
133         * with a Spring-managed transaction.
134         */
135        protected boolean isSynchedLocalTransactionAllowed() {
136                return this.synchedLocalTransactionAllowed;
137        }
138
139
140        @Override
141        public Connection createConnection() throws JMSException {
142                Connection targetConnection = this.targetConnectionFactory.createConnection();
143                return getTransactionAwareConnectionProxy(targetConnection);
144        }
145
146        @Override
147        public Connection createConnection(String username, String password) throws JMSException {
148                Connection targetConnection = this.targetConnectionFactory.createConnection(username, password);
149                return getTransactionAwareConnectionProxy(targetConnection);
150        }
151
152        @Override
153        public QueueConnection createQueueConnection() throws JMSException {
154                if (!(this.targetConnectionFactory instanceof QueueConnectionFactory)) {
155                        throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory");
156                }
157                QueueConnection targetConnection =
158                                ((QueueConnectionFactory) this.targetConnectionFactory).createQueueConnection();
159                return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection);
160        }
161
162        @Override
163        public QueueConnection createQueueConnection(String username, String password) throws JMSException {
164                if (!(this.targetConnectionFactory instanceof QueueConnectionFactory)) {
165                        throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory");
166                }
167                QueueConnection targetConnection =
168                                ((QueueConnectionFactory) this.targetConnectionFactory).createQueueConnection(username, password);
169                return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection);
170        }
171
172        @Override
173        public TopicConnection createTopicConnection() throws JMSException {
174                if (!(this.targetConnectionFactory instanceof TopicConnectionFactory)) {
175                        throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
176                }
177                TopicConnection targetConnection =
178                                ((TopicConnectionFactory) this.targetConnectionFactory).createTopicConnection();
179                return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
180        }
181
182        @Override
183        public TopicConnection createTopicConnection(String username, String password) throws JMSException {
184                if (!(this.targetConnectionFactory instanceof TopicConnectionFactory)) {
185                        throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory");
186                }
187                TopicConnection targetConnection =
188                                ((TopicConnectionFactory) this.targetConnectionFactory).createTopicConnection(username, password);
189                return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection);
190        }
191
192
193        /**
194         * Wrap the given Connection with a proxy that delegates every method call to it
195         * but handles Session lookup in a transaction-aware fashion.
196         * @param target the original Connection to wrap
197         * @return the wrapped Connection
198         */
199        protected Connection getTransactionAwareConnectionProxy(Connection target) {
200                List<Class<?>> classes = new ArrayList<Class<?>>(3);
201                classes.add(Connection.class);
202                if (target instanceof QueueConnection) {
203                        classes.add(QueueConnection.class);
204                }
205                if (target instanceof TopicConnection) {
206                        classes.add(TopicConnection.class);
207                }
208                return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
209                                ClassUtils.toClassArray(classes), new TransactionAwareConnectionInvocationHandler(target));
210        }
211
212
213        /**
214         * Invocation handler that exposes transactional Sessions for the underlying Connection.
215         */
216        private class TransactionAwareConnectionInvocationHandler implements InvocationHandler {
217
218                private final Connection target;
219
220                public TransactionAwareConnectionInvocationHandler(Connection target) {
221                        this.target = target;
222                }
223
224                @Override
225                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
226                        // Invocation on ConnectionProxy interface coming in...
227
228                        if (method.getName().equals("equals")) {
229                                // Only consider equal when proxies are identical.
230                                return (proxy == args[0]);
231                        }
232                        else if (method.getName().equals("hashCode")) {
233                                // Use hashCode of Connection proxy.
234                                return System.identityHashCode(proxy);
235                        }
236                        else if (Session.class == method.getReturnType()) {
237                                Session session = ConnectionFactoryUtils.getTransactionalSession(
238                                                getTargetConnectionFactory(), this.target, isSynchedLocalTransactionAllowed());
239                                if (session != null) {
240                                        return getCloseSuppressingSessionProxy(session);
241                                }
242                        }
243                        else if (QueueSession.class == method.getReturnType()) {
244                                QueueSession session = ConnectionFactoryUtils.getTransactionalQueueSession(
245                                                (QueueConnectionFactory) getTargetConnectionFactory(), (QueueConnection) this.target,
246                                                isSynchedLocalTransactionAllowed());
247                                if (session != null) {
248                                        return getCloseSuppressingSessionProxy(session);
249                                }
250                        }
251                        else if (TopicSession.class == method.getReturnType()) {
252                                TopicSession session = ConnectionFactoryUtils.getTransactionalTopicSession(
253                                                (TopicConnectionFactory) getTargetConnectionFactory(), (TopicConnection) this.target,
254                                                isSynchedLocalTransactionAllowed());
255                                if (session != null) {
256                                        return getCloseSuppressingSessionProxy(session);
257                                }
258                        }
259
260                        // Invoke method on target Connection.
261                        try {
262                                return method.invoke(this.target, args);
263                        }
264                        catch (InvocationTargetException ex) {
265                                throw ex.getTargetException();
266                        }
267                }
268
269                private Session getCloseSuppressingSessionProxy(Session target) {
270                        List<Class<?>> classes = new ArrayList<Class<?>>(3);
271                        classes.add(SessionProxy.class);
272                        if (target instanceof QueueSession) {
273                                classes.add(QueueSession.class);
274                        }
275                        if (target instanceof TopicSession) {
276                                classes.add(TopicSession.class);
277                        }
278                        return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(),
279                                        ClassUtils.toClassArray(classes), new CloseSuppressingSessionInvocationHandler(target));
280                }
281        }
282
283
284        /**
285         * Invocation handler that suppresses close calls for a transactional JMS Session.
286         */
287        private static class CloseSuppressingSessionInvocationHandler implements InvocationHandler {
288
289                private final Session target;
290
291                public CloseSuppressingSessionInvocationHandler(Session target) {
292                        this.target = target;
293                }
294
295                @Override
296                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
297                        // Invocation on SessionProxy interface coming in...
298
299                        if (method.getName().equals("equals")) {
300                                // Only consider equal when proxies are identical.
301                                return (proxy == args[0]);
302                        }
303                        else if (method.getName().equals("hashCode")) {
304                                // Use hashCode of Connection proxy.
305                                return System.identityHashCode(proxy);
306                        }
307                        else if (method.getName().equals("commit")) {
308                                throw new TransactionInProgressException("Commit call not allowed within a managed transaction");
309                        }
310                        else if (method.getName().equals("rollback")) {
311                                throw new TransactionInProgressException("Rollback call not allowed within a managed transaction");
312                        }
313                        else if (method.getName().equals("close")) {
314                                // Handle close method: not to be closed within a transaction.
315                                return null;
316                        }
317                        else if (method.getName().equals("getTargetSession")) {
318                                // Handle getTargetSession method: return underlying Session.
319                                return this.target;
320                        }
321
322                        // Invoke method on target Session.
323                        try {
324                                return method.invoke(this.target, args);
325                        }
326                        catch (InvocationTargetException ex) {
327                                throw ex.getTargetException();
328                        }
329                }
330        }
331
332}