001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import javax.jms.Connection; 020import javax.jms.ConnectionFactory; 021import javax.jms.JMSException; 022import javax.jms.Session; 023import javax.jms.TransactionRolledBackException; 024 025import org.springframework.beans.factory.InitializingBean; 026import org.springframework.lang.Nullable; 027import org.springframework.transaction.CannotCreateTransactionException; 028import org.springframework.transaction.InvalidIsolationLevelException; 029import org.springframework.transaction.TransactionDefinition; 030import org.springframework.transaction.TransactionSystemException; 031import org.springframework.transaction.UnexpectedRollbackException; 032import org.springframework.transaction.support.AbstractPlatformTransactionManager; 033import org.springframework.transaction.support.DefaultTransactionStatus; 034import org.springframework.transaction.support.ResourceTransactionManager; 035import org.springframework.transaction.support.SmartTransactionObject; 036import org.springframework.transaction.support.TransactionSynchronizationManager; 037import org.springframework.util.Assert; 038 039/** 040 * {@link org.springframework.transaction.PlatformTransactionManager} implementation 041 * for a single JMS {@link javax.jms.ConnectionFactory}. Binds a JMS 042 * Connection/Session pair from the specified ConnectionFactory to the thread, 043 * potentially allowing for one thread-bound Session per ConnectionFactory. 044 * 045 * <p>This local strategy is an alternative to executing JMS operations within 046 * JTA transactions. Its advantage is that it is able to work in any environment, 047 * for example a standalone application or a test suite, with any message broker 048 * as target. However, this strategy is <i>not</i> able to provide XA transactions, 049 * for example in order to share transactions between messaging and database access. 050 * A full JTA/XA setup is required for XA transactions, typically using Spring's 051 * {@link org.springframework.transaction.jta.JtaTransactionManager} as strategy. 052 * 053 * <p>Application code is required to retrieve the transactional JMS Session via 054 * {@link ConnectionFactoryUtils#getTransactionalSession} instead of a standard 055 * Java EE-style {@link ConnectionFactory#createConnection()} call with subsequent 056 * Session creation. Spring's {@link org.springframework.jms.core.JmsTemplate} 057 * will autodetect a thread-bound Session and automatically participate in it. 058 * 059 * <p>Alternatively, you can allow application code to work with the standard 060 * Java EE-style lookup pattern on a ConnectionFactory, for example for legacy code 061 * that is not aware of Spring at all. In that case, define a 062 * {@link TransactionAwareConnectionFactoryProxy} for your target ConnectionFactory, 063 * which will automatically participate in Spring-managed transactions. 064 * 065 * <p><b>The use of {@link CachingConnectionFactory} as a target for this 066 * transaction manager is strongly recommended.</b> CachingConnectionFactory 067 * uses a single JMS Connection for all JMS access in order to avoid the overhead 068 * of repeated Connection creation, as well as maintaining a cache of Sessions. 069 * Each transaction will then share the same JMS Connection, while still using 070 * its own individual JMS Session. 071 * 072 * <p>The use of a <i>raw</i> target ConnectionFactory would not only be inefficient 073 * because of the lack of resource reuse. It might also lead to strange effects 074 * when your JMS driver doesn't accept {@code MessageProducer.close()} calls 075 * and/or {@code MessageConsumer.close()} calls before {@code Session.commit()}, 076 * with the latter supposed to commit all the messages that have been sent through the 077 * producer handle and received through the consumer handle. As a safe general solution, 078 * always pass in a {@link CachingConnectionFactory} into this transaction manager's 079 * {@link #setConnectionFactory "connectionFactory"} property. 080 * 081 * <p>Transaction synchronization is turned off by default, as this manager might 082 * be used alongside a datastore-based Spring transaction manager such as the 083 * JDBC {@link org.springframework.jdbc.datasource.DataSourceTransactionManager}, 084 * which has stronger needs for synchronization. 085 * 086 * @author Juergen Hoeller 087 * @since 1.1 088 * @see ConnectionFactoryUtils#getTransactionalSession 089 * @see TransactionAwareConnectionFactoryProxy 090 * @see org.springframework.jms.core.JmsTemplate 091 */ 092@SuppressWarnings("serial") 093public class JmsTransactionManager extends AbstractPlatformTransactionManager 094 implements ResourceTransactionManager, InitializingBean { 095 096 @Nullable 097 private ConnectionFactory connectionFactory; 098 099 private boolean lazyResourceRetrieval = false; 100 101 102 /** 103 * Create a new JmsTransactionManager for bean-style usage. 104 * <p>Note: The ConnectionFactory has to be set before using the instance. 105 * This constructor can be used to prepare a JmsTemplate via a BeanFactory, 106 * typically setting the ConnectionFactory via setConnectionFactory. 107 * <p>Turns off transaction synchronization by default, as this manager might 108 * be used alongside a datastore-based Spring transaction manager like 109 * DataSourceTransactionManager, which has stronger needs for synchronization. 110 * Only one manager is allowed to drive synchronization at any point of time. 111 * @see #setConnectionFactory 112 * @see #setTransactionSynchronization 113 */ 114 public JmsTransactionManager() { 115 setTransactionSynchronization(SYNCHRONIZATION_NEVER); 116 } 117 118 /** 119 * Create a new JmsTransactionManager, given a ConnectionFactory. 120 * @param connectionFactory the ConnectionFactory to obtain connections from 121 */ 122 public JmsTransactionManager(ConnectionFactory connectionFactory) { 123 this(); 124 setConnectionFactory(connectionFactory); 125 afterPropertiesSet(); 126 } 127 128 129 /** 130 * Set the JMS ConnectionFactory that this instance should manage transactions for. 131 */ 132 public void setConnectionFactory(@Nullable ConnectionFactory cf) { 133 if (cf instanceof TransactionAwareConnectionFactoryProxy) { 134 // If we got a TransactionAwareConnectionFactoryProxy, we need to perform transactions 135 // for its underlying target ConnectionFactory, else JMS access code won't see 136 // properly exposed transactions (i.e. transactions for the target ConnectionFactory). 137 this.connectionFactory = ((TransactionAwareConnectionFactoryProxy) cf).getTargetConnectionFactory(); 138 } 139 else { 140 this.connectionFactory = cf; 141 } 142 } 143 144 /** 145 * Return the JMS ConnectionFactory that this instance should manage transactions for. 146 */ 147 @Nullable 148 public ConnectionFactory getConnectionFactory() { 149 return this.connectionFactory; 150 } 151 152 /** 153 * Obtain the ConnectionFactory for actual use. 154 * @return the ConnectionFactory (never {@code null}) 155 * @throws IllegalStateException in case of no ConnectionFactory set 156 * @since 5.0 157 */ 158 protected final ConnectionFactory obtainConnectionFactory() { 159 ConnectionFactory connectionFactory = getConnectionFactory(); 160 Assert.state(connectionFactory != null, "No ConnectionFactory set"); 161 return connectionFactory; 162 } 163 164 /** 165 * Specify whether this transaction manager should lazily retrieve a JMS 166 * Connection and Session on access within a transaction ({@code true}). 167 * By default, it will eagerly create a JMS Connection and Session at 168 * transaction begin ({@code false}). 169 * @since 5.1.6 170 * @see JmsResourceHolder#getConnection() 171 * @see JmsResourceHolder#getSession() 172 */ 173 public void setLazyResourceRetrieval(boolean lazyResourceRetrieval) { 174 this.lazyResourceRetrieval = lazyResourceRetrieval; 175 } 176 177 /** 178 * Make sure the ConnectionFactory has been set. 179 */ 180 @Override 181 public void afterPropertiesSet() { 182 if (getConnectionFactory() == null) { 183 throw new IllegalArgumentException("Property 'connectionFactory' is required"); 184 } 185 } 186 187 188 @Override 189 public Object getResourceFactory() { 190 return obtainConnectionFactory(); 191 } 192 193 @Override 194 protected Object doGetTransaction() { 195 JmsTransactionObject txObject = new JmsTransactionObject(); 196 txObject.setResourceHolder( 197 (JmsResourceHolder) TransactionSynchronizationManager.getResource(obtainConnectionFactory())); 198 return txObject; 199 } 200 201 @Override 202 protected boolean isExistingTransaction(Object transaction) { 203 JmsTransactionObject txObject = (JmsTransactionObject) transaction; 204 return txObject.hasResourceHolder(); 205 } 206 207 @Override 208 protected void doBegin(Object transaction, TransactionDefinition definition) { 209 if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { 210 throw new InvalidIsolationLevelException("JMS does not support an isolation level concept"); 211 } 212 213 ConnectionFactory connectionFactory = obtainConnectionFactory(); 214 JmsTransactionObject txObject = (JmsTransactionObject) transaction; 215 Connection con = null; 216 Session session = null; 217 try { 218 JmsResourceHolder resourceHolder; 219 if (this.lazyResourceRetrieval) { 220 resourceHolder = new LazyJmsResourceHolder(connectionFactory); 221 } 222 else { 223 con = createConnection(); 224 session = createSession(con); 225 if (logger.isDebugEnabled()) { 226 logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]"); 227 } 228 resourceHolder = new JmsResourceHolder(connectionFactory, con, session); 229 } 230 resourceHolder.setSynchronizedWithTransaction(true); 231 int timeout = determineTimeout(definition); 232 if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { 233 resourceHolder.setTimeoutInSeconds(timeout); 234 } 235 txObject.setResourceHolder(resourceHolder); 236 TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder); 237 } 238 catch (Throwable ex) { 239 if (session != null) { 240 try { 241 session.close(); 242 } 243 catch (Throwable ex2) { 244 // ignore 245 } 246 } 247 if (con != null) { 248 try { 249 con.close(); 250 } 251 catch (Throwable ex2) { 252 // ignore 253 } 254 } 255 throw new CannotCreateTransactionException("Could not create JMS transaction", ex); 256 } 257 } 258 259 @Override 260 protected Object doSuspend(Object transaction) { 261 JmsTransactionObject txObject = (JmsTransactionObject) transaction; 262 txObject.setResourceHolder(null); 263 return TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); 264 } 265 266 @Override 267 protected void doResume(@Nullable Object transaction, Object suspendedResources) { 268 TransactionSynchronizationManager.bindResource(obtainConnectionFactory(), suspendedResources); 269 } 270 271 @Override 272 protected void doCommit(DefaultTransactionStatus status) { 273 JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction(); 274 Session session = txObject.getResourceHolder().getOriginalSession(); 275 if (session != null) { 276 try { 277 if (status.isDebug()) { 278 logger.debug("Committing JMS transaction on Session [" + session + "]"); 279 } 280 session.commit(); 281 } 282 catch (TransactionRolledBackException ex) { 283 throw new UnexpectedRollbackException("JMS transaction rolled back", ex); 284 } 285 catch (JMSException ex) { 286 throw new TransactionSystemException("Could not commit JMS transaction", ex); 287 } 288 } 289 } 290 291 @Override 292 protected void doRollback(DefaultTransactionStatus status) { 293 JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction(); 294 Session session = txObject.getResourceHolder().getOriginalSession(); 295 if (session != null) { 296 try { 297 if (status.isDebug()) { 298 logger.debug("Rolling back JMS transaction on Session [" + session + "]"); 299 } 300 session.rollback(); 301 } 302 catch (JMSException ex) { 303 throw new TransactionSystemException("Could not roll back JMS transaction", ex); 304 } 305 } 306 } 307 308 @Override 309 protected void doSetRollbackOnly(DefaultTransactionStatus status) { 310 JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction(); 311 txObject.getResourceHolder().setRollbackOnly(); 312 } 313 314 @Override 315 protected void doCleanupAfterCompletion(Object transaction) { 316 JmsTransactionObject txObject = (JmsTransactionObject) transaction; 317 TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); 318 txObject.getResourceHolder().closeAll(); 319 txObject.getResourceHolder().clear(); 320 } 321 322 323 /** 324 * Create a JMS Connection via this template's ConnectionFactory. 325 * <p>This implementation uses JMS 1.1 API. 326 * @return the new JMS Connection 327 * @throws javax.jms.JMSException if thrown by JMS API methods 328 */ 329 protected Connection createConnection() throws JMSException { 330 return obtainConnectionFactory().createConnection(); 331 } 332 333 /** 334 * Create a JMS Session for the given Connection. 335 * <p>This implementation uses JMS 1.1 API. 336 * @param con the JMS Connection to create a Session for 337 * @return the new JMS Session 338 * @throws javax.jms.JMSException if thrown by JMS API methods 339 */ 340 protected Session createSession(Connection con) throws JMSException { 341 return con.createSession(true, Session.AUTO_ACKNOWLEDGE); 342 } 343 344 345 /** 346 * Lazily initializing variant of {@link JmsResourceHolder}, 347 * initializing a JMS Connection and Session on user access. 348 */ 349 private class LazyJmsResourceHolder extends JmsResourceHolder { 350 351 private boolean connectionInitialized = false; 352 353 private boolean sessionInitialized = false; 354 355 public LazyJmsResourceHolder(@Nullable ConnectionFactory connectionFactory) { 356 super(connectionFactory); 357 } 358 359 @Override 360 @Nullable 361 public Connection getConnection() { 362 initializeConnection(); 363 return super.getConnection(); 364 } 365 366 @Override 367 @Nullable 368 public <C extends Connection> C getConnection(Class<C> connectionType) { 369 initializeConnection(); 370 return super.getConnection(connectionType); 371 } 372 373 @Override 374 @Nullable 375 public Session getSession() { 376 initializeSession(); 377 return super.getSession(); 378 } 379 380 @Override 381 @Nullable 382 public <S extends Session> S getSession(Class<S> sessionType) { 383 initializeSession(); 384 return super.getSession(sessionType); 385 } 386 387 @Override 388 @Nullable 389 public <S extends Session> S getSession(Class<S> sessionType, @Nullable Connection connection) { 390 initializeSession(); 391 return super.getSession(sessionType, connection); 392 } 393 394 private void initializeConnection() { 395 if (!this.connectionInitialized) { 396 try { 397 addConnection(createConnection()); 398 } 399 catch (JMSException ex) { 400 throw new CannotCreateTransactionException( 401 "Failed to lazily initialize JMS Connection for transaction", ex); 402 } 403 this.connectionInitialized = true; 404 } 405 } 406 407 private void initializeSession() { 408 if (!this.sessionInitialized) { 409 Connection con = getConnection(); 410 Assert.state(con != null, "No transactional JMS Connection"); 411 try { 412 addSession(createSession(con), con); 413 } 414 catch (JMSException ex) { 415 throw new CannotCreateTransactionException( 416 "Failed to lazily initialize JMS Session for transaction", ex); 417 } 418 this.sessionInitialized = true; 419 } 420 } 421 } 422 423 424 /** 425 * JMS transaction object, representing a JmsResourceHolder. 426 * Used as transaction object by JmsTransactionManager. 427 * @see JmsResourceHolder 428 */ 429 private static class JmsTransactionObject implements SmartTransactionObject { 430 431 @Nullable 432 private JmsResourceHolder resourceHolder; 433 434 public void setResourceHolder(@Nullable JmsResourceHolder resourceHolder) { 435 this.resourceHolder = resourceHolder; 436 } 437 438 public JmsResourceHolder getResourceHolder() { 439 Assert.state(this.resourceHolder != null, "No JmsResourceHolder available"); 440 return this.resourceHolder; 441 } 442 443 public boolean hasResourceHolder() { 444 return (this.resourceHolder != null); 445 } 446 447 @Override 448 public boolean isRollbackOnly() { 449 return (this.resourceHolder != null && this.resourceHolder.isRollbackOnly()); 450 } 451 452 @Override 453 public void flush() { 454 // no-op 455 } 456 } 457 458}