001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import java.lang.reflect.InvocationHandler; 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.lang.reflect.Proxy; 023import java.util.ArrayList; 024import java.util.List; 025import javax.jms.Connection; 026import javax.jms.ConnectionFactory; 027import javax.jms.JMSException; 028import javax.jms.QueueConnection; 029import javax.jms.QueueConnectionFactory; 030import javax.jms.QueueSession; 031import javax.jms.Session; 032import javax.jms.TopicConnection; 033import javax.jms.TopicConnectionFactory; 034import javax.jms.TopicSession; 035import javax.jms.TransactionInProgressException; 036 037import org.springframework.util.Assert; 038import org.springframework.util.ClassUtils; 039 040/** 041 * Proxy for a target JMS {@link javax.jms.ConnectionFactory}, adding awareness of 042 * Spring-managed transactions. Similar to a transactional JNDI ConnectionFactory 043 * as provided by a Java EE application server. 044 * 045 * <p>Messaging code which should remain unaware of Spring's JMS support can work with 046 * this proxy to seamlessly participate in Spring-managed transactions. Note that the 047 * transaction manager, for example {@link JmsTransactionManager}, still needs to work 048 * with the underlying ConnectionFactory, <i>not</i> with this proxy. 049 * 050 * <p><b>Make sure that TransactionAwareConnectionFactoryProxy is the outermost 051 * ConnectionFactory of a chain of ConnectionFactory proxies/adapters.</b> 052 * TransactionAwareConnectionFactoryProxy can delegate either directly to the 053 * target factory or to some intermediary adapter like 054 * {@link UserCredentialsConnectionFactoryAdapter}. 055 * 056 * <p>Delegates to {@link ConnectionFactoryUtils} for automatically participating 057 * in thread-bound transactions, for example managed by {@link JmsTransactionManager}. 058 * {@code createSession} calls and {@code close} calls on returned Sessions 059 * will behave properly within a transaction, that is, always work on the transactional 060 * Session. If not within a transaction, normal ConnectionFactory behavior applies. 061 * 062 * <p>Note that transactional JMS Sessions will be registered on a per-Connection 063 * basis. To share the same JMS Session across a transaction, make sure that you 064 * operate on the same JMS Connection handle - either through reusing the handle 065 * or through configuring a {@link SingleConnectionFactory} underneath. 066 * 067 * <p>Returned transactional Session proxies will implement the {@link SessionProxy} 068 * interface to allow for access to the underlying target Session. This is only 069 * intended for accessing vendor-specific Session API or for testing purposes 070 * (e.g. to perform manual transaction control). For typical application purposes, 071 * simply use the standard JMS Session interface. 072 * 073 * @author Juergen Hoeller 074 * @since 2.0 075 * @see UserCredentialsConnectionFactoryAdapter 076 * @see SingleConnectionFactory 077 */ 078public class TransactionAwareConnectionFactoryProxy 079 implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { 080 081 private ConnectionFactory targetConnectionFactory; 082 083 private boolean synchedLocalTransactionAllowed = false; 084 085 086 /** 087 * Create a new TransactionAwareConnectionFactoryProxy. 088 */ 089 public TransactionAwareConnectionFactoryProxy() { 090 } 091 092 /** 093 * Create a new TransactionAwareConnectionFactoryProxy. 094 * @param targetConnectionFactory the target ConnectionFactory 095 */ 096 public TransactionAwareConnectionFactoryProxy(ConnectionFactory targetConnectionFactory) { 097 setTargetConnectionFactory(targetConnectionFactory); 098 } 099 100 101 /** 102 * Set the target ConnectionFactory that this ConnectionFactory should delegate to. 103 */ 104 public final void setTargetConnectionFactory(ConnectionFactory targetConnectionFactory) { 105 Assert.notNull(targetConnectionFactory, "'targetConnectionFactory' must not be null"); 106 this.targetConnectionFactory = targetConnectionFactory; 107 } 108 109 /** 110 * Return the target ConnectionFactory that this ConnectionFactory should delegate to. 111 */ 112 protected ConnectionFactory getTargetConnectionFactory() { 113 return this.targetConnectionFactory; 114 } 115 116 /** 117 * Set whether to allow for a local JMS transaction that is synchronized with a 118 * Spring-managed transaction (where the main transaction might be a JDBC-based 119 * one for a specific DataSource, for example), with the JMS transaction committing 120 * right after the main transaction. If not allowed, the given ConnectionFactory 121 * needs to handle transaction enlistment underneath the covers. 122 * <p>Default is "false": If not within a managed transaction that encompasses 123 * the underlying JMS ConnectionFactory, standard Sessions will be returned. 124 * Turn this flag on to allow participation in any Spring-managed transaction, 125 * with a local JMS transaction synchronized with the main transaction. 126 */ 127 public void setSynchedLocalTransactionAllowed(boolean synchedLocalTransactionAllowed) { 128 this.synchedLocalTransactionAllowed = synchedLocalTransactionAllowed; 129 } 130 131 /** 132 * Return whether to allow for a local JMS transaction that is synchronized 133 * with a Spring-managed transaction. 134 */ 135 protected boolean isSynchedLocalTransactionAllowed() { 136 return this.synchedLocalTransactionAllowed; 137 } 138 139 140 @Override 141 public Connection createConnection() throws JMSException { 142 Connection targetConnection = this.targetConnectionFactory.createConnection(); 143 return getTransactionAwareConnectionProxy(targetConnection); 144 } 145 146 @Override 147 public Connection createConnection(String username, String password) throws JMSException { 148 Connection targetConnection = this.targetConnectionFactory.createConnection(username, password); 149 return getTransactionAwareConnectionProxy(targetConnection); 150 } 151 152 @Override 153 public QueueConnection createQueueConnection() throws JMSException { 154 if (!(this.targetConnectionFactory instanceof QueueConnectionFactory)) { 155 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory"); 156 } 157 QueueConnection targetConnection = 158 ((QueueConnectionFactory) this.targetConnectionFactory).createQueueConnection(); 159 return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection); 160 } 161 162 @Override 163 public QueueConnection createQueueConnection(String username, String password) throws JMSException { 164 if (!(this.targetConnectionFactory instanceof QueueConnectionFactory)) { 165 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory"); 166 } 167 QueueConnection targetConnection = 168 ((QueueConnectionFactory) this.targetConnectionFactory).createQueueConnection(username, password); 169 return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection); 170 } 171 172 @Override 173 public TopicConnection createTopicConnection() throws JMSException { 174 if (!(this.targetConnectionFactory instanceof TopicConnectionFactory)) { 175 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory"); 176 } 177 TopicConnection targetConnection = 178 ((TopicConnectionFactory) this.targetConnectionFactory).createTopicConnection(); 179 return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection); 180 } 181 182 @Override 183 public TopicConnection createTopicConnection(String username, String password) throws JMSException { 184 if (!(this.targetConnectionFactory instanceof TopicConnectionFactory)) { 185 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory"); 186 } 187 TopicConnection targetConnection = 188 ((TopicConnectionFactory) this.targetConnectionFactory).createTopicConnection(username, password); 189 return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection); 190 } 191 192 193 /** 194 * Wrap the given Connection with a proxy that delegates every method call to it 195 * but handles Session lookup in a transaction-aware fashion. 196 * @param target the original Connection to wrap 197 * @return the wrapped Connection 198 */ 199 protected Connection getTransactionAwareConnectionProxy(Connection target) { 200 List<Class<?>> classes = new ArrayList<Class<?>>(3); 201 classes.add(Connection.class); 202 if (target instanceof QueueConnection) { 203 classes.add(QueueConnection.class); 204 } 205 if (target instanceof TopicConnection) { 206 classes.add(TopicConnection.class); 207 } 208 return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), 209 ClassUtils.toClassArray(classes), new TransactionAwareConnectionInvocationHandler(target)); 210 } 211 212 213 /** 214 * Invocation handler that exposes transactional Sessions for the underlying Connection. 215 */ 216 private class TransactionAwareConnectionInvocationHandler implements InvocationHandler { 217 218 private final Connection target; 219 220 public TransactionAwareConnectionInvocationHandler(Connection target) { 221 this.target = target; 222 } 223 224 @Override 225 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 226 // Invocation on ConnectionProxy interface coming in... 227 228 if (method.getName().equals("equals")) { 229 // Only consider equal when proxies are identical. 230 return (proxy == args[0]); 231 } 232 else if (method.getName().equals("hashCode")) { 233 // Use hashCode of Connection proxy. 234 return System.identityHashCode(proxy); 235 } 236 else if (Session.class == method.getReturnType()) { 237 Session session = ConnectionFactoryUtils.getTransactionalSession( 238 getTargetConnectionFactory(), this.target, isSynchedLocalTransactionAllowed()); 239 if (session != null) { 240 return getCloseSuppressingSessionProxy(session); 241 } 242 } 243 else if (QueueSession.class == method.getReturnType()) { 244 QueueSession session = ConnectionFactoryUtils.getTransactionalQueueSession( 245 (QueueConnectionFactory) getTargetConnectionFactory(), (QueueConnection) this.target, 246 isSynchedLocalTransactionAllowed()); 247 if (session != null) { 248 return getCloseSuppressingSessionProxy(session); 249 } 250 } 251 else if (TopicSession.class == method.getReturnType()) { 252 TopicSession session = ConnectionFactoryUtils.getTransactionalTopicSession( 253 (TopicConnectionFactory) getTargetConnectionFactory(), (TopicConnection) this.target, 254 isSynchedLocalTransactionAllowed()); 255 if (session != null) { 256 return getCloseSuppressingSessionProxy(session); 257 } 258 } 259 260 // Invoke method on target Connection. 261 try { 262 return method.invoke(this.target, args); 263 } 264 catch (InvocationTargetException ex) { 265 throw ex.getTargetException(); 266 } 267 } 268 269 private Session getCloseSuppressingSessionProxy(Session target) { 270 List<Class<?>> classes = new ArrayList<Class<?>>(3); 271 classes.add(SessionProxy.class); 272 if (target instanceof QueueSession) { 273 classes.add(QueueSession.class); 274 } 275 if (target instanceof TopicSession) { 276 classes.add(TopicSession.class); 277 } 278 return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(), 279 ClassUtils.toClassArray(classes), new CloseSuppressingSessionInvocationHandler(target)); 280 } 281 } 282 283 284 /** 285 * Invocation handler that suppresses close calls for a transactional JMS Session. 286 */ 287 private static class CloseSuppressingSessionInvocationHandler implements InvocationHandler { 288 289 private final Session target; 290 291 public CloseSuppressingSessionInvocationHandler(Session target) { 292 this.target = target; 293 } 294 295 @Override 296 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 297 // Invocation on SessionProxy interface coming in... 298 299 if (method.getName().equals("equals")) { 300 // Only consider equal when proxies are identical. 301 return (proxy == args[0]); 302 } 303 else if (method.getName().equals("hashCode")) { 304 // Use hashCode of Connection proxy. 305 return System.identityHashCode(proxy); 306 } 307 else if (method.getName().equals("commit")) { 308 throw new TransactionInProgressException("Commit call not allowed within a managed transaction"); 309 } 310 else if (method.getName().equals("rollback")) { 311 throw new TransactionInProgressException("Rollback call not allowed within a managed transaction"); 312 } 313 else if (method.getName().equals("close")) { 314 // Handle close method: not to be closed within a transaction. 315 return null; 316 } 317 else if (method.getName().equals("getTargetSession")) { 318 // Handle getTargetSession method: return underlying Session. 319 return this.target; 320 } 321 322 // Invoke method on target Session. 323 try { 324 return method.invoke(this.target, args); 325 } 326 catch (InvocationTargetException ex) { 327 throw ex.getTargetException(); 328 } 329 } 330 } 331 332}