001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import java.lang.reflect.InvocationHandler; 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.lang.reflect.Proxy; 023import java.util.ArrayList; 024import java.util.List; 025 026import javax.jms.Connection; 027import javax.jms.ConnectionFactory; 028import javax.jms.JMSContext; 029import javax.jms.JMSException; 030import javax.jms.QueueConnection; 031import javax.jms.QueueConnectionFactory; 032import javax.jms.QueueSession; 033import javax.jms.Session; 034import javax.jms.TopicConnection; 035import javax.jms.TopicConnectionFactory; 036import javax.jms.TopicSession; 037import javax.jms.TransactionInProgressException; 038 039import org.springframework.lang.Nullable; 040import org.springframework.util.Assert; 041import org.springframework.util.ClassUtils; 042 043/** 044 * Proxy for a target JMS {@link javax.jms.ConnectionFactory}, adding awareness of 045 * Spring-managed transactions. Similar to a transactional JNDI ConnectionFactory 046 * as provided by a Java EE application server. 047 * 048 * <p>Messaging code which should remain unaware of Spring's JMS support can work with 049 * this proxy to seamlessly participate in Spring-managed transactions. Note that the 050 * transaction manager, for example {@link JmsTransactionManager}, still needs to work 051 * with the underlying ConnectionFactory, <i>not</i> with this proxy. 052 * 053 * <p><b>Make sure that TransactionAwareConnectionFactoryProxy is the outermost 054 * ConnectionFactory of a chain of ConnectionFactory proxies/adapters.</b> 055 * TransactionAwareConnectionFactoryProxy can delegate either directly to the 056 * target factory or to some intermediary adapter like 057 * {@link UserCredentialsConnectionFactoryAdapter}. 058 * 059 * <p>Delegates to {@link ConnectionFactoryUtils} for automatically participating 060 * in thread-bound transactions, for example managed by {@link JmsTransactionManager}. 061 * {@code createSession} calls and {@code close} calls on returned Sessions 062 * will behave properly within a transaction, that is, always work on the transactional 063 * Session. If not within a transaction, normal ConnectionFactory behavior applies. 064 * 065 * <p>Note that transactional JMS Sessions will be registered on a per-Connection 066 * basis. To share the same JMS Session across a transaction, make sure that you 067 * operate on the same JMS Connection handle - either through reusing the handle 068 * or through configuring a {@link SingleConnectionFactory} underneath. 069 * 070 * <p>Returned transactional Session proxies will implement the {@link SessionProxy} 071 * interface to allow for access to the underlying target Session. This is only 072 * intended for accessing vendor-specific Session API or for testing purposes 073 * (e.g. to perform manual transaction control). For typical application purposes, 074 * simply use the standard JMS Session interface. 075 * 076 * <p>As of Spring Framework 5, this class delegates JMS 2.0 {@code JMSContext} 077 * calls and therefore requires the JMS 2.0 API to be present at runtime. 078 * It may nevertheless run against a JMS 1.1 driver (bound to the JMS 2.0 API) 079 * as long as no actual JMS 2.0 calls are triggered by the application's setup. 080 * 081 * @author Juergen Hoeller 082 * @since 2.0 083 * @see UserCredentialsConnectionFactoryAdapter 084 * @see SingleConnectionFactory 085 */ 086public class TransactionAwareConnectionFactoryProxy 087 implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { 088 089 @Nullable 090 private ConnectionFactory targetConnectionFactory; 091 092 private boolean synchedLocalTransactionAllowed = false; 093 094 095 /** 096 * Create a new TransactionAwareConnectionFactoryProxy. 097 */ 098 public TransactionAwareConnectionFactoryProxy() { 099 } 100 101 /** 102 * Create a new TransactionAwareConnectionFactoryProxy. 103 * @param targetConnectionFactory the target ConnectionFactory 104 */ 105 public TransactionAwareConnectionFactoryProxy(ConnectionFactory targetConnectionFactory) { 106 setTargetConnectionFactory(targetConnectionFactory); 107 } 108 109 110 /** 111 * Set the target ConnectionFactory that this ConnectionFactory should delegate to. 112 */ 113 public final void setTargetConnectionFactory(ConnectionFactory targetConnectionFactory) { 114 Assert.notNull(targetConnectionFactory, "'targetConnectionFactory' must not be null"); 115 this.targetConnectionFactory = targetConnectionFactory; 116 } 117 118 /** 119 * Return the target ConnectionFactory that this ConnectionFactory should delegate to. 120 */ 121 protected ConnectionFactory getTargetConnectionFactory() { 122 ConnectionFactory target = this.targetConnectionFactory; 123 Assert.state(target != null, "'targetConnectionFactory' is required"); 124 return target; 125 } 126 127 /** 128 * Set whether to allow for a local JMS transaction that is synchronized with a 129 * Spring-managed transaction (where the main transaction might be a JDBC-based 130 * one for a specific DataSource, for example), with the JMS transaction committing 131 * right after the main transaction. If not allowed, the given ConnectionFactory 132 * needs to handle transaction enlistment underneath the covers. 133 * <p>Default is "false": If not within a managed transaction that encompasses 134 * the underlying JMS ConnectionFactory, standard Sessions will be returned. 135 * Turn this flag on to allow participation in any Spring-managed transaction, 136 * with a local JMS transaction synchronized with the main transaction. 137 */ 138 public void setSynchedLocalTransactionAllowed(boolean synchedLocalTransactionAllowed) { 139 this.synchedLocalTransactionAllowed = synchedLocalTransactionAllowed; 140 } 141 142 /** 143 * Return whether to allow for a local JMS transaction that is synchronized 144 * with a Spring-managed transaction. 145 */ 146 protected boolean isSynchedLocalTransactionAllowed() { 147 return this.synchedLocalTransactionAllowed; 148 } 149 150 151 @Override 152 public Connection createConnection() throws JMSException { 153 Connection targetConnection = getTargetConnectionFactory().createConnection(); 154 return getTransactionAwareConnectionProxy(targetConnection); 155 } 156 157 @Override 158 public Connection createConnection(String username, String password) throws JMSException { 159 Connection targetConnection = getTargetConnectionFactory().createConnection(username, password); 160 return getTransactionAwareConnectionProxy(targetConnection); 161 } 162 163 @Override 164 public QueueConnection createQueueConnection() throws JMSException { 165 ConnectionFactory target = getTargetConnectionFactory(); 166 if (!(target instanceof QueueConnectionFactory)) { 167 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory"); 168 } 169 QueueConnection targetConnection = ((QueueConnectionFactory) target).createQueueConnection(); 170 return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection); 171 } 172 173 @Override 174 public QueueConnection createQueueConnection(String username, String password) throws JMSException { 175 ConnectionFactory target = getTargetConnectionFactory(); 176 if (!(target instanceof QueueConnectionFactory)) { 177 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no QueueConnectionFactory"); 178 } 179 QueueConnection targetConnection = ((QueueConnectionFactory) target).createQueueConnection(username, password); 180 return (QueueConnection) getTransactionAwareConnectionProxy(targetConnection); 181 } 182 183 @Override 184 public TopicConnection createTopicConnection() throws JMSException { 185 ConnectionFactory target = getTargetConnectionFactory(); 186 if (!(target instanceof TopicConnectionFactory)) { 187 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory"); 188 } 189 TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection(); 190 return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection); 191 } 192 193 @Override 194 public TopicConnection createTopicConnection(String username, String password) throws JMSException { 195 ConnectionFactory target = getTargetConnectionFactory(); 196 if (!(target instanceof TopicConnectionFactory)) { 197 throw new javax.jms.IllegalStateException("'targetConnectionFactory' is no TopicConnectionFactory"); 198 } 199 TopicConnection targetConnection = ((TopicConnectionFactory) target).createTopicConnection(username, password); 200 return (TopicConnection) getTransactionAwareConnectionProxy(targetConnection); 201 } 202 203 @Override 204 public JMSContext createContext() { 205 return getTargetConnectionFactory().createContext(); 206 } 207 208 @Override 209 public JMSContext createContext(String userName, String password) { 210 return getTargetConnectionFactory().createContext(userName, password); 211 } 212 213 @Override 214 public JMSContext createContext(String userName, String password, int sessionMode) { 215 return getTargetConnectionFactory().createContext(userName, password, sessionMode); 216 } 217 218 @Override 219 public JMSContext createContext(int sessionMode) { 220 return getTargetConnectionFactory().createContext(sessionMode); 221 } 222 223 224 /** 225 * Wrap the given Connection with a proxy that delegates every method call to it 226 * but handles Session lookup in a transaction-aware fashion. 227 * @param target the original Connection to wrap 228 * @return the wrapped Connection 229 */ 230 protected Connection getTransactionAwareConnectionProxy(Connection target) { 231 List<Class<?>> classes = new ArrayList<>(3); 232 classes.add(Connection.class); 233 if (target instanceof QueueConnection) { 234 classes.add(QueueConnection.class); 235 } 236 if (target instanceof TopicConnection) { 237 classes.add(TopicConnection.class); 238 } 239 return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), 240 ClassUtils.toClassArray(classes), new TransactionAwareConnectionInvocationHandler(target)); 241 } 242 243 244 /** 245 * Invocation handler that exposes transactional Sessions for the underlying Connection. 246 */ 247 private class TransactionAwareConnectionInvocationHandler implements InvocationHandler { 248 249 private final Connection target; 250 251 public TransactionAwareConnectionInvocationHandler(Connection target) { 252 this.target = target; 253 } 254 255 @Override 256 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 257 // Invocation on ConnectionProxy interface coming in... 258 259 if (method.getName().equals("equals")) { 260 // Only consider equal when proxies are identical. 261 return (proxy == args[0]); 262 } 263 else if (method.getName().equals("hashCode")) { 264 // Use hashCode of Connection proxy. 265 return System.identityHashCode(proxy); 266 } 267 else if (Session.class == method.getReturnType()) { 268 Session session = ConnectionFactoryUtils.getTransactionalSession( 269 getTargetConnectionFactory(), this.target, isSynchedLocalTransactionAllowed()); 270 if (session != null) { 271 return getCloseSuppressingSessionProxy(session); 272 } 273 } 274 else if (QueueSession.class == method.getReturnType()) { 275 QueueSession session = ConnectionFactoryUtils.getTransactionalQueueSession( 276 (QueueConnectionFactory) getTargetConnectionFactory(), (QueueConnection) this.target, 277 isSynchedLocalTransactionAllowed()); 278 if (session != null) { 279 return getCloseSuppressingSessionProxy(session); 280 } 281 } 282 else if (TopicSession.class == method.getReturnType()) { 283 TopicSession session = ConnectionFactoryUtils.getTransactionalTopicSession( 284 (TopicConnectionFactory) getTargetConnectionFactory(), (TopicConnection) this.target, 285 isSynchedLocalTransactionAllowed()); 286 if (session != null) { 287 return getCloseSuppressingSessionProxy(session); 288 } 289 } 290 291 // Invoke method on target Connection. 292 try { 293 return method.invoke(this.target, args); 294> 295 catch (InvocationTargetException ex) { 296 throw ex.getTargetException(); 297 } 298 } 299 300 private Session getCloseSuppressingSessionProxy(Session target) { 301 List<Class<?>> classes = new ArrayList<>(3); 302 classes.add(SessionProxy.class); 303 if (target instanceof QueueSession) { 304 classes.add(QueueSession.class); 305 } 306 if (target instanceof TopicSession) { 307 classes.add(TopicSession.class); 308 } 309 return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(), 310 ClassUtils.toClassArray(classes), new CloseSuppressingSessionInvocationHandler(target)); 311 } 312 } 313 314 315 /** 316 * Invocation handler that suppresses close calls for a transactional JMS Session. 317 */ 318 private static class CloseSuppressingSessionInvocationHandler implements InvocationHandler { 319 320 private final Session target; 321 322 public CloseSuppressingSessionInvocationHandler(Session target) { 323 this.target = target; 324 } 325 326 @Override 327 @Nullable 328 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 329 // Invocation on SessionProxy interface coming in... 330 331 if (method.getName().equals("equals")) { 332 // Only consider equal when proxies are identical. 333 return (proxy == args[0]); 334 } 335 else if (method.getName().equals("hashCode")) { 336 // Use hashCode of Connection proxy. 337 return System.identityHashCode(proxy); 338 } 339 else if (method.getName().equals("commit")) { 340 throw new TransactionInProgressException("Commit call not allowed within a managed transaction"); 341 } 342 else if (method.getName().equals("rollback")) { 343 throw new TransactionInProgressException("Rollback call not allowed within a managed transaction"); 344 } 345 else if (method.getName().equals("close")) { 346 // Handle close method: not to be closed within a transaction. 347 return null; 348 } 349 else if (method.getName().equals("getTargetSession")) { 350 // Handle getTargetSession method: return underlying Session. 351 return this.target; 352 } 353 354 // Invoke method on target Session. 355 try { 356 return method.invoke(this.target, args); 357 } 358 catch (InvocationTargetException ex) { 359 throw ex.getTargetException(); 360 } 361 } 362 } 363 364}