001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import javax.jms.Connection; 020import javax.jms.ConnectionFactory; 021import javax.jms.JMSException; 022import javax.jms.QueueConnection; 023import javax.jms.QueueConnectionFactory; 024import javax.jms.QueueSession; 025import javax.jms.Session; 026import javax.jms.TopicConnection; 027import javax.jms.TopicConnectionFactory; 028import javax.jms.TopicSession; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.springframework.transaction.support.ResourceHolderSynchronization; 034import org.springframework.transaction.support.TransactionSynchronizationManager; 035import org.springframework.util.Assert; 036 037/** 038 * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular 039 * for obtaining transactional JMS resources for a given ConnectionFactory. 040 * 041 * <p>Mainly for internal use within the framework. Used by 042 * {@link org.springframework.jms.core.JmsTemplate} as well as 043 * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}. 044 * 045 * @author Juergen Hoeller 046 * @since 2.0 047 * @see SmartConnectionFactory 048 */ 049public abstract class ConnectionFactoryUtils { 050 051 private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class); 052 053 054 /** 055 * Release the given Connection, stopping it (if necessary) and eventually closing it. 056 * <p>Checks {@link SmartConnectionFactory#shouldStop}, if available. 057 * This is essentially a more sophisticated version of 058 * {@link org.springframework.jms.support.JmsUtils#closeConnection}. 059 * @param con the Connection to release 060 * (if this is {@code null}, the call will be ignored) 061 * @param cf the ConnectionFactory that the Connection was obtained from 062 * (may be {@code null}) 063 * @param started whether the Connection might have been started by the application 064 * @see SmartConnectionFactory#shouldStop 065 * @see org.springframework.jms.support.JmsUtils#closeConnection 066 */ 067 public static void releaseConnection(Connection con, ConnectionFactory cf, boolean started) { 068 if (con == null) { 069 return; 070 } 071 if (started && cf instanceof SmartConnectionFactory && ((SmartConnectionFactory) cf).shouldStop(con)) { 072 try { 073 con.stop(); 074 } 075 catch (Throwable ex) { 076 logger.debug("Could not stop JMS Connection before closing it", ex); 077 } 078 } 079 try { 080 con.close(); 081 } 082 catch (Throwable ex) { 083 logger.debug("Could not close JMS Connection", ex); 084 } 085 } 086 087 /** 088 * Return the innermost target Session of the given Session. If the given 089 * Session is a proxy, it will be unwrapped until a non-proxy Session is 090 * found. Otherwise, the passed-in Session will be returned as-is. 091 * @param session the Session proxy to unwrap 092 * @return the innermost target Session, or the passed-in one if no proxy 093 * @see SessionProxy#getTargetSession() 094 */ 095 public static Session getTargetSession(Session session) { 096 Session sessionToUse = session; 097 while (sessionToUse instanceof SessionProxy) { 098 sessionToUse = ((SessionProxy) sessionToUse).getTargetSession(); 099 } 100 return sessionToUse; 101 } 102 103 104 105 /** 106 * Determine whether the given JMS Session is transactional, that is, 107 * bound to the current thread by Spring's transaction facilities. 108 * @param session the JMS Session to check 109 * @param cf the JMS ConnectionFactory that the Session originated from 110 * @return whether the Session is transactional 111 */ 112 public static boolean isSessionTransactional(Session session, ConnectionFactory cf) { 113 if (session == null || cf == null) { 114 return false; 115 } 116 JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(cf); 117 return (resourceHolder != null && resourceHolder.containsSession(session)); 118 } 119 120 121 /** 122 * Obtain a JMS Session that is synchronized with the current transaction, if any. 123 * @param cf the ConnectionFactory to obtain a Session for 124 * @param existingCon the existing JMS Connection to obtain a Session for 125 * (may be {@code null}) 126 * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction 127 * that is synchronized with a Spring-managed transaction (where the main transaction 128 * might be a JDBC-based one for a specific DataSource, for example), with the JMS 129 * transaction committing right after the main transaction. If not allowed, the given 130 * ConnectionFactory needs to handle transaction enlistment underneath the covers. 131 * @return the transactional Session, or {@code null} if none found 132 * @throws JMSException in case of JMS failure 133 */ 134 public static Session getTransactionalSession(final ConnectionFactory cf, 135 final Connection existingCon, final boolean synchedLocalTransactionAllowed) 136 throws JMSException { 137 138 return doGetTransactionalSession(cf, new ResourceFactory() { 139 @Override 140 public Session getSession(JmsResourceHolder holder) { 141 return holder.getSession(Session.class, existingCon); 142 } 143 @Override 144 public Connection getConnection(JmsResourceHolder holder) { 145 return (existingCon != null ? existingCon : holder.getConnection()); 146 } 147 @Override 148 public Connection createConnection() throws JMSException { 149 return cf.createConnection(); 150 } 151 @Override 152 public Session createSession(Connection con) throws JMSException { 153 return con.createSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 154 } 155 @Override 156 public boolean isSynchedLocalTransactionAllowed() { 157 return synchedLocalTransactionAllowed; 158 } 159 }, true); 160 } 161 162 /** 163 * Obtain a JMS QueueSession that is synchronized with the current transaction, if any. 164 * <p>Mainly intended for use with the JMS 1.0.2 API. 165 * @param cf the ConnectionFactory to obtain a Session for 166 * @param existingCon the existing JMS Connection to obtain a Session for 167 * (may be {@code null}) 168 * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction 169 * that is synchronized with a Spring-managed transaction (where the main transaction 170 * might be a JDBC-based one for a specific DataSource, for example), with the JMS 171 * transaction committing right after the main transaction. If not allowed, the given 172 * ConnectionFactory needs to handle transaction enlistment underneath the covers. 173 * @return the transactional Session, or {@code null} if none found 174 * @throws JMSException in case of JMS failure 175 */ 176 public static QueueSession getTransactionalQueueSession(final QueueConnectionFactory cf, 177 final QueueConnection existingCon, final boolean synchedLocalTransactionAllowed) 178 throws JMSException { 179 180 return (QueueSession) doGetTransactionalSession(cf, new ResourceFactory() { 181 @Override 182 public Session getSession(JmsResourceHolder holder) { 183 return holder.getSession(QueueSession.class, existingCon); 184 } 185 @Override 186 public Connection getConnection(JmsResourceHolder holder) { 187 return (existingCon != null ? existingCon : holder.getConnection(QueueConnection.class)); 188 } 189 @Override 190 public Connection createConnection() throws JMSException { 191 return cf.createQueueConnection(); 192 } 193 @Override 194 public Session createSession(Connection con) throws JMSException { 195 return ((QueueConnection) con).createQueueSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 196 } 197 @Override 198 public boolean isSynchedLocalTransactionAllowed() { 199 return synchedLocalTransactionAllowed; 200 } 201 }, true); 202 } 203 204 /** 205 * Obtain a JMS TopicSession that is synchronized with the current transaction, if any. 206 * <p>Mainly intended for use with the JMS 1.0.2 API. 207 * @param cf the ConnectionFactory to obtain a Session for 208 * @param existingCon the existing JMS Connection to obtain a Session for 209 * (may be {@code null}) 210 * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction 211 * that is synchronized with a Spring-managed transaction (where the main transaction 212 * might be a JDBC-based one for a specific DataSource, for example), with the JMS 213 * transaction committing right after the main transaction. If not allowed, the given 214 * ConnectionFactory needs to handle transaction enlistment underneath the covers. 215 * @return the transactional Session, or {@code null} if none found 216 * @throws JMSException in case of JMS failure 217 */ 218 public static TopicSession getTransactionalTopicSession(final TopicConnectionFactory cf, 219 final TopicConnection existingCon, final boolean synchedLocalTransactionAllowed) 220 throws JMSException { 221 222 return (TopicSession) doGetTransactionalSession(cf, new ResourceFactory() { 223 @Override 224 public Session getSession(JmsResourceHolder holder) { 225 return holder.getSession(TopicSession.class, existingCon); 226 } 227 @Override 228 public Connection getConnection(JmsResourceHolder holder) { 229 return (existingCon != null ? existingCon : holder.getConnection(TopicConnection.class)); 230 } 231 @Override 232 public Connection createConnection() throws JMSException { 233 return cf.createTopicConnection(); 234 } 235 @Override 236 public Session createSession(Connection con) throws JMSException { 237 return ((TopicConnection) con).createTopicSession( 238 synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 239 } 240 @Override 241 public boolean isSynchedLocalTransactionAllowed() { 242 return synchedLocalTransactionAllowed; 243 } 244 }, true); 245 } 246 247 /** 248 * Obtain a JMS Session that is synchronized with the current transaction, if any. 249 * <p>This {@code doGetTransactionalSession} variant always starts the underlying 250 * JMS Connection, assuming that the Session will be used for receiving messages. 251 * @param connectionFactory the JMS ConnectionFactory to bind for 252 * (used as TransactionSynchronizationManager key) 253 * @param resourceFactory the ResourceFactory to use for extracting or creating 254 * JMS resources 255 * @return the transactional Session, or {@code null} if none found 256 * @throws JMSException in case of JMS failure 257 * @see #doGetTransactionalSession(javax.jms.ConnectionFactory, ResourceFactory, boolean) 258 */ 259 public static Session doGetTransactionalSession( 260 ConnectionFactory connectionFactory, ResourceFactory resourceFactory) throws JMSException { 261 262 return doGetTransactionalSession(connectionFactory, resourceFactory, true); 263 } 264 265 /** 266 * Obtain a JMS Session that is synchronized with the current transaction, if any. 267 * @param connectionFactory the JMS ConnectionFactory to bind for 268 * (used as TransactionSynchronizationManager key) 269 * @param resourceFactory the ResourceFactory to use for extracting or creating 270 * JMS resources 271 * @param startConnection whether the underlying JMS Connection approach should be 272 * started in order to allow for receiving messages. Note that a reused Connection 273 * may already have been started before, even if this flag is {@code false}. 274 * @return the transactional Session, or {@code null} if none found 275 * @throws JMSException in case of JMS failure 276 */ 277 public static Session doGetTransactionalSession( 278 ConnectionFactory connectionFactory, ResourceFactory resourceFactory, boolean startConnection) 279 throws JMSException { 280 281 Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); 282 Assert.notNull(resourceFactory, "ResourceFactory must not be null"); 283 284 JmsResourceHolder resourceHolder = 285 (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 286 if (resourceHolder != null) { 287 Session session = resourceFactory.getSession(resourceHolder); 288 if (session != null) { 289 if (startConnection) { 290 Connection con = resourceFactory.getConnection(resourceHolder); 291 if (con != null) { 292 con.start(); 293 } 294 } 295 return session; 296 } 297 if (resourceHolder.isFrozen()) { 298 return null; 299 } 300 } 301 if (!TransactionSynchronizationManager.isSynchronizationActive()) { 302 return null; 303 } 304 JmsResourceHolder resourceHolderToUse = resourceHolder; 305 if (resourceHolderToUse == null) { 306 resourceHolderToUse = new JmsResourceHolder(connectionFactory); 307 } 308 Connection con = resourceFactory.getConnection(resourceHolderToUse); 309 Session session = null; 310 try { 311 boolean isExistingCon = (con != null); 312 if (!isExistingCon) { 313 con = resourceFactory.createConnection(); 314 resourceHolderToUse.addConnection(con); 315 } 316 session = resourceFactory.createSession(con); 317 resourceHolderToUse.addSession(session, con); 318 if (startConnection) { 319 con.start(); 320 } 321 } 322 catch (JMSException ex) { 323 if (session != null) { 324 try { 325 session.close(); 326 } 327 catch (Throwable ex2) { 328 // ignore 329 } 330 } 331 if (con != null) { 332 try { 333 con.close(); 334 } 335 catch (Throwable ex2) { 336 // ignore 337 } 338 } 339 throw ex; 340 } 341 if (resourceHolderToUse != resourceHolder) { 342 TransactionSynchronizationManager.registerSynchronization( 343 new JmsResourceSynchronization(resourceHolderToUse, connectionFactory, 344 resourceFactory.isSynchedLocalTransactionAllowed())); 345 resourceHolderToUse.setSynchronizedWithTransaction(true); 346 TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse); 347 } 348 return session; 349 } 350 351 352 /** 353 * Callback interface for resource creation. 354 * Serving as argument for the {@code doGetTransactionalSession} method. 355 */ 356 public interface ResourceFactory { 357 358 /** 359 * Fetch an appropriate Session from the given JmsResourceHolder. 360 * @param holder the JmsResourceHolder 361 * @return an appropriate Session fetched from the holder, 362 * or {@code null} if none found 363 */ 364 Session getSession(JmsResourceHolder holder); 365 366 /** 367 * Fetch an appropriate Connection from the given JmsResourceHolder. 368 * @param holder the JmsResourceHolder 369 * @return an appropriate Connection fetched from the holder, 370 * or {@code null} if none found 371 */ 372 Connection getConnection(JmsResourceHolder holder); 373 374 /** 375 * Create a new JMS Connection for registration with a JmsResourceHolder. 376 * @return the new JMS Connection 377 * @throws JMSException if thrown by JMS API methods 378 */ 379 Connection createConnection() throws JMSException; 380 381 /** 382 * Create a new JMS Session for registration with a JmsResourceHolder. 383 * @param con the JMS Connection to create a Session for 384 * @return the new JMS Session 385 * @throws JMSException if thrown by JMS API methods 386 */ 387 Session createSession(Connection con) throws JMSException; 388 389 /** 390 * Return whether to allow for a local JMS transaction that is synchronized with 391 * a Spring-managed transaction (where the main transaction might be a JDBC-based 392 * one for a specific DataSource, for example), with the JMS transaction 393 * committing right after the main transaction. 394 * @return whether to allow for synchronizing a local JMS transaction 395 */ 396 boolean isSynchedLocalTransactionAllowed(); 397 } 398 399 400 /** 401 * Callback for resource cleanup at the end of a non-native JMS transaction 402 * (e.g. when participating in a JtaTransactionManager transaction). 403 * @see org.springframework.transaction.jta.JtaTransactionManager 404 */ 405 private static class JmsResourceSynchronization extends ResourceHolderSynchronization<JmsResourceHolder, Object> { 406 407 private final boolean transacted; 408 409 public JmsResourceSynchronization(JmsResourceHolder resourceHolder, Object resourceKey, boolean transacted) { 410 super(resourceHolder, resourceKey); 411 this.transacted = transacted; 412 } 413 414 @Override 415 protected boolean shouldReleaseBeforeCompletion() { 416 return !this.transacted; 417 } 418 419 @Override 420 protected void processResourceAfterCommit(JmsResourceHolder resourceHolder) { 421 try { 422 resourceHolder.commitAll(); 423 } 424 catch (JMSException ex) { 425 throw new SynchedLocalTransactionFailedException("Local JMS transaction failed to commit", ex); 426 } 427 } 428 429 @Override 430 protected void releaseResource(JmsResourceHolder resourceHolder, Object resourceKey) { 431 resourceHolder.closeAll(); 432 } 433 } 434 435}