001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import javax.jms.Connection; 020import javax.jms.ConnectionFactory; 021import javax.jms.JMSException; 022import javax.jms.QueueConnection; 023import javax.jms.QueueConnectionFactory; 024import javax.jms.QueueSession; 025import javax.jms.Session; 026import javax.jms.TopicConnection; 027import javax.jms.TopicConnectionFactory; 028import javax.jms.TopicSession; 029 030import org.apache.commons.logging.Log; 031import org.apache.commons.logging.LogFactory; 032 033import org.springframework.lang.Nullable; 034import org.springframework.transaction.support.ResourceHolderSynchronization; 035import org.springframework.transaction.support.TransactionSynchronizationManager; 036import org.springframework.util.Assert; 037 038/** 039 * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular 040 * for obtaining transactional JMS resources for a given ConnectionFactory. 041 * 042 * <p>Mainly for internal use within the framework. Used by 043 * {@link org.springframework.jms.core.JmsTemplate} as well as 044 * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}. 045 * 046 * @author Juergen Hoeller 047 * @since 2.0 048 * @see SmartConnectionFactory 049 */ 050public abstract class ConnectionFactoryUtils { 051 052 private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class); 053 054 055 /** 056 * Release the given Connection, stopping it (if necessary) and eventually closing it. 057 * <p>Checks {@link SmartConnectionFactory#shouldStop}, if available. 058 * This is essentially a more sophisticated version of 059 * {@link org.springframework.jms.support.JmsUtils#closeConnection}. 060 * @param con the Connection to release 061 * (if this is {@code null}, the call will be ignored) 062 * @param cf the ConnectionFactory that the Connection was obtained from 063 * (may be {@code null}) 064 * @param started whether the Connection might have been started by the application 065 * @see SmartConnectionFactory#shouldStop 066 * @see org.springframework.jms.support.JmsUtils#closeConnection 067 */ 068 public static void releaseConnection(@Nullable Connection con, @Nullable ConnectionFactory cf, boolean started) { 069 if (con == null) { 070 return; 071 } 072 if (started && cf instanceof SmartConnectionFactory && ((SmartConnectionFactory) cf).shouldStop(con)) { 073 try { 074 con.stop(); 075 } 076 catch (Throwable ex) { 077 logger.debug("Could not stop JMS Connection before closing it", ex); 078 } 079 } 080 try { 081 con.close(); 082 } 083 catch (Throwable ex) { 084 logger.debug("Could not close JMS Connection", ex); 085 } 086 } 087 088 /** 089 * Return the innermost target Session of the given Session. If the given 090 * Session is a proxy, it will be unwrapped until a non-proxy Session is 091 * found. Otherwise, the passed-in Session will be returned as-is. 092 * @param session the Session proxy to unwrap 093 * @return the innermost target Session, or the passed-in one if no proxy 094 * @see SessionProxy#getTargetSession() 095 */ 096 public static Session getTargetSession(Session session) { 097 Session sessionToUse = session; 098 while (sessionToUse instanceof SessionProxy) { 099 sessionToUse = ((SessionProxy) sessionToUse).getTargetSession(); 100 } 101 return sessionToUse; 102 } 103 104 105 106 /** 107 * Determine whether the given JMS Session is transactional, that is, 108 * bound to the current thread by Spring's transaction facilities. 109 * @param session the JMS Session to check 110 * @param cf the JMS ConnectionFactory that the Session originated from 111 * @return whether the Session is transactional 112 */ 113 public static boolean isSessionTransactional(@Nullable Session session, @Nullable ConnectionFactory cf) { 114 if (session == null || cf == null) { 115 return false; 116 } 117 JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(cf); 118 return (resourceHolder != null && resourceHolder.containsSession(session)); 119 } 120 121 122 /** 123 * Obtain a JMS Session that is synchronized with the current transaction, if any. 124 * @param cf the ConnectionFactory to obtain a Session for 125 * @param existingCon the existing JMS Connection to obtain a Session for 126 * (may be {@code null}) 127 * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction 128 * that is synchronized with a Spring-managed transaction (where the main transaction 129 * might be a JDBC-based one for a specific DataSource, for example), with the JMS 130 * transaction committing right after the main transaction. If not allowed, the given 131 * ConnectionFactory needs to handle transaction enlistment underneath the covers. 132 * @return the transactional Session, or {@code null} if none found 133 * @throws JMSException in case of JMS failure 134 */ 135 @Nullable 136 public static Session getTransactionalSession(final ConnectionFactory cf, 137 @Nullable final Connection existingCon, final boolean synchedLocalTransactionAllowed) 138 throws JMSException { 139 140 return doGetTransactionalSession(cf, new ResourceFactory() { 141 @Override 142 @Nullable 143 public Session getSession(JmsResourceHolder holder) { 144 return holder.getSession(Session.class, existingCon); 145 } 146 @Override 147 @Nullable 148 public Connection getConnection(JmsResourceHolder holder) { 149 return (existingCon != null ? existingCon : holder.getConnection()); 150 } 151 @Override 152 public Connection createConnection() throws JMSException { 153 return cf.createConnection(); 154 } 155 @Override 156 public Session createSession(Connection con) throws JMSException { 157 return con.createSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 158 } 159 @Override 160 public boolean isSynchedLocalTransactionAllowed() { 161 return synchedLocalTransactionAllowed; 162 } 163 }, true); 164 } 165 166 /** 167 * Obtain a JMS QueueSession that is synchronized with the current transaction, if any. 168 * <p>Mainly intended for use with the JMS 1.0.2 API. 169 * @param cf the ConnectionFactory to obtain a Session for 170 * @param existingCon the existing JMS Connection to obtain a Session for 171 * (may be {@code null}) 172 * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction 173 * that is synchronized with a Spring-managed transaction (where the main transaction 174 * might be a JDBC-based one for a specific DataSource, for example), with the JMS 175 * transaction committing right after the main transaction. If not allowed, the given 176 * ConnectionFactory needs to handle transaction enlistment underneath the covers. 177 * @return the transactional Session, or {@code null} if none found 178 * @throws JMSException in case of JMS failure 179 */ 180 @Nullable 181 public static QueueSession getTransactionalQueueSession(final QueueConnectionFactory cf, 182 @Nullable final QueueConnection existingCon, final boolean synchedLocalTransactionAllowed) 183 throws JMSException { 184 185 return (QueueSession) doGetTransactionalSession(cf, new ResourceFactory() { 186 @Override 187 @Nullable 188 public Session getSession(JmsResourceHolder holder) { 189 return holder.getSession(QueueSession.class, existingCon); 190 } 191 @Override 192 @Nullable 193 public Connection getConnection(JmsResourceHolder holder) { 194 return (existingCon != null ? existingCon : holder.getConnection(QueueConnection.class)); 195 } 196 @Override 197 public Connection createConnection() throws JMSException { 198 return cf.createQueueConnection(); 199 } 200 @Override 201 public Session createSession(Connection con) throws JMSException { 202 return ((QueueConnection) con).createQueueSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 203 } 204 @Override 205 public boolean isSynchedLocalTransactionAllowed() { 206 return synchedLocalTransactionAllowed; 207 } 208 }, true); 209 } 210 211 /** 212 * Obtain a JMS TopicSession that is synchronized with the current transaction, if any. 213 * <p>Mainly intended for use with the JMS 1.0.2 API. 214 * @param cf the ConnectionFactory to obtain a Session for 215 * @param existingCon the existing JMS Connection to obtain a Session for 216 * (may be {@code null}) 217 * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction 218 * that is synchronized with a Spring-managed transaction (where the main transaction 219 * might be a JDBC-based one for a specific DataSource, for example), with the JMS 220 * transaction committing right after the main transaction. If not allowed, the given 221 * ConnectionFactory needs to handle transaction enlistment underneath the covers. 222 * @return the transactional Session, or {@code null} if none found 223 * @throws JMSException in case of JMS failure 224 */ 225 @Nullable 226 public static TopicSession getTransactionalTopicSession(final TopicConnectionFactory cf, 227 @Nullable final TopicConnection existingCon, final boolean synchedLocalTransactionAllowed) 228 throws JMSException { 229 230 return (TopicSession) doGetTransactionalSession(cf, new ResourceFactory() { 231 @Override 232 @Nullable 233 public Session getSession(JmsResourceHolder holder) { 234 return holder.getSession(TopicSession.class, existingCon); 235 } 236 @Override 237 @Nullable 238 public Connection getConnection(JmsResourceHolder holder) { 239 return (existingCon != null ? existingCon : holder.getConnection(TopicConnection.class)); 240 } 241 @Override 242 public Connection createConnection() throws JMSException { 243 return cf.createTopicConnection(); 244 } 245 @Override 246 public Session createSession(Connection con) throws JMSException { 247 return ((TopicConnection) con).createTopicSession( 248 synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE); 249 } 250 @Override 251 public boolean isSynchedLocalTransactionAllowed() { 252 return synchedLocalTransactionAllowed; 253 } 254 }, true); 255 } 256 257 /** 258 * Obtain a JMS Session that is synchronized with the current transaction, if any. 259 * <p>This {@code doGetTransactionalSession} variant always starts the underlying 260 * JMS Connection, assuming that the Session will be used for receiving messages. 261 * @param connectionFactory the JMS ConnectionFactory to bind for 262 * (used as TransactionSynchronizationManager key) 263 * @param resourceFactory the ResourceFactory to use for extracting or creating 264 * JMS resources 265 * @return the transactional Session, or {@code null} if none found 266 * @throws JMSException in case of JMS failure 267 * @see #doGetTransactionalSession(javax.jms.ConnectionFactory, ResourceFactory, boolean) 268 */ 269 @Nullable 270 public static Session doGetTransactionalSession( 271 ConnectionFactory connectionFactory, ResourceFactory resourceFactory) throws JMSException { 272 273 return doGetTransactionalSession(connectionFactory, resourceFactory, true); 274 } 275 276 /** 277 * Obtain a JMS Session that is synchronized with the current transaction, if any. 278 * @param connectionFactory the JMS ConnectionFactory to bind for 279 * (used as TransactionSynchronizationManager key) 280 * @param resourceFactory the ResourceFactory to use for extracting or creating 281 * JMS resources 282 * @param startConnection whether the underlying JMS Connection approach should be 283 * started in order to allow for receiving messages. Note that a reused Connection 284 * may already have been started before, even if this flag is {@code false}. 285 * @return the transactional Session, or {@code null} if none found 286 * @throws JMSException in case of JMS failure 287 */ 288 @Nullable 289 public static Session doGetTpan class="sourceLineNo">290 ConnectionFactory connectionFactory, ResourceFactory resourceFactory, boolean startConnection) 291 throws JMSException { 292 293 Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); 294 Assert.notNull(resourceFactory, "ResourceFactory must not be null"); 295 296 JmsResourceHolder resourceHolder = 297 (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); 298 if (resourceHolder != null) { 299 Session session = resourceFactory.getSession(resourceHolder); 300 if (session != null) { 301 if (startConnection) { 302 Connection con = resourceFactory.getConnection(resourceHolder); 303 if (con != null) { 304 con.start(); 305 } 306 } 307 return session; 308 } 309 if (resourceHolder.isFrozen()) { 310 return null; 311 } 312 } 313 if (!TransactionSynchronizationManager.isSynchronizationActive()) { 314 return null; 315 } 316 JmsResourceHolder resourceHolderToUse = resourceHolder; 317 if (resourceHolderToUse == null) { 318 resourceHolderToUse = new JmsResourceHolder(connectionFactory); 319 } 320 Connection con = resourceFactory.getConnection(resourceHolderToUse); 321 Session session = null; 322 try { 323 boolean isExistingCon = (con != null); 324 if (!isExistingCon) { 325 con = resourceFactory.createConnection(); 326 resourceHolderToUse.addConnection(con); 327 } 328 session = resourceFactory.createSession(con); 329 resourceHolderToUse.addSession(session, con); 330 if (startConnection) { 331 con.start(); 332 } 333 } 334 catch (JMSException ex) { 335 if (session != null) { 336 try { 337 session.close(); 338 } 339 catch (Throwable ex2) { 340 // ignore 341 } 342 } 343 if (con != null) { 344 try { 345 con.close(); 346 } 347 catch (Throwable ex2) { 348 // ignore 349 } 350 } 351 throw ex; 352 } 353 if (resourceHolderToUse != resourceHolder) { 354 TransactionSynchronizationManager.registerSynchronization( 355 new JmsResourceSynchronization(resourceHolderToUse, connectionFactory, 356 resourceFactory.isSynchedLocalTransactionAllowed())); 357 resourceHolderToUse.setSynchronizedWithTransaction(true); 358 TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse); 359 } 360 return session; 361 } 362 363 364 /** 365 * Callback interface for resource creation. 366 * Serving as argument for the {@code doGetTransactionalSession} method. 367 */ 368 public interface ResourceFactory { 369 370 /** 371 * Fetch an appropriate Session from the given JmsResourceHolder. 372 * @param holder the JmsResourceHolder 373 * @return an appropriate Session fetched from the holder, 374 * or {@code null} if none found 375 */ 376 @Nullable 377 Session getSession(JmsResourceHolder holder); 378 379 /** 380 * Fetch an appropriate Connection from the given JmsResourceHolder. 381 * @param holder the JmsResourceHolder 382 * @return an appropriate Connection fetched from the holder, 383 * or {@code null} if none found 384 */ 385 @Nullable 386 Connection getConnection(JmsResourceHolder holder); 387 388 /** 389 * Create a new JMS Connection for registration with a JmsResourceHolder. 390 * @return the new JMS Connection 391 * @throws JMSException if thrown by JMS API methods 392 */ 393 Connection createConnection() throws JMSException; 394 395 /** 396 * Create a new JMS Session for registration with a JmsResourceHolder. 397 * @param con the JMS Connection to create a Session for 398 * @return the new JMS Session 399 * @throws JMSException if thrown by JMS API methods 400 */ 401 Session createSession(Connection con) throws JMSException; 402 403 /** 404 * Return whether to allow for a local JMS transaction that is synchronized with 405 * a Spring-managed transaction (where the main transaction might be a JDBC-based 406 * one for a specific DataSource, for example), with the JMS transaction 407 * committing right after the main transaction. 408 * @return whether to allow for synchronizing a local JMS transaction 409 */ 410 boolean isSynchedLocalTransactionAllowed(); 411 } 412 413 414 /** 415 * Callback for resource cleanup at the end of a non-native JMS transaction 416 * (e.g. when participating in a JtaTransactionManager transaction). 417 * @see org.springframework.transaction.jta.JtaTransactionManager 418 */ 419 private static class JmsResourceSynchronization extends ResourceHolderSynchronization<JmsResourceHolder, Object> { 420 421 private final boolean transacted; 422 423 public JmsResourceSynchronization(JmsResourceHolder resourceHolder, Object resourceKey, boolean transacted) { 424 super(resourceHolder, resourceKey); 425 this.transacted = transacted; 426 } 427 428 @Override 429 protected boolean shouldReleaseBeforeCompletion() { 430 return !this.transacted; 431 } 432 433 @Override 434 protected void processResourceAfterCommit(JmsResourceHolder resourceHolder) { 435 try { 436 resourceHolder.commitAll(); 437 } 438 catch (JMSException ex) { 439 throw new SynchedLocalTransactionFailedException("Local JMS transaction failed to commit", ex); 440 } 441 } 442 443 @Override 444 protected void releaseResource(JmsResourceHolder resourceHolder, Object resourceKey) { 445 resourceHolder.closeAll(); 446 } 447 } 448 449}