001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.JMSException; 022import javax.jms.Message; 023import javax.jms.MessageConsumer; 024import javax.jms.Session; 025 026import org.springframework.jms.connection.ConnectionFactoryUtils; 027import org.springframework.jms.connection.JmsResourceHolder; 028import org.springframework.jms.connection.SingleConnectionFactory; 029import org.springframework.jms.support.JmsUtils; 030import org.springframework.lang.Nullable; 031import org.springframework.transaction.PlatformTransactionManager; 032import org.springframework.transaction.TransactionStatus; 033import org.springframework.transaction.support.DefaultTransactionDefinition; 034import org.springframework.transaction.support.ResourceTransactionManager; 035import org.springframework.transaction.support.TransactionSynchronizationManager; 036import org.springframework.transaction.support.TransactionSynchronizationUtils; 037import org.springframework.util.Assert; 038 039/** 040 * Base class for listener container implementations which are based on polling. 041 * Provides support for listener handling based on {@link javax.jms.MessageConsumer}, 042 * optionally participating in externally managed transactions. 043 * 044 * <p>This listener container variant is built for repeated polling attempts, 045 * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used 046 * may be reobtained fo reach attempt or cached in between attempts; this is up 047 * to the concrete implementation. The receive timeout for each attempt can be 048 * configured through the {@link #setReceiveTimeout "receiveTimeout"} property. 049 * 050 * <p>The underlying mechanism is based on standard JMS MessageConsumer handling, 051 * which is perfectly compatible with both native JMS and JMS in a Java EE environment. 052 * Neither the JMS {@code MessageConsumer.setMessageListener} facility nor the JMS 053 * ServerSessionPool facility is required. A further advantage of this approach is 054 * full control over the listening process, allowing for custom scaling and throttling 055 * and of concurrent message processing (which is up to concrete subclasses). 056 * 057 * <p>Message reception and listener execution can automatically be wrapped 058 * in transactions through passing a Spring 059 * {@link org.springframework.transaction.PlatformTransactionManager} into the 060 * {@link #setTransactionManager "transactionManager"} property. This will usually 061 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a 062 * Java EE environment, in combination with a JTA-aware JMS ConnectionFactory 063 * obtained from JNDI (check your application server's documentation). 064 * 065 * <p>This base class does not assume any specific mechanism for asynchronous 066 * execution of polling invokers. Check out {@link DefaultMessageListenerContainer} 067 * for a concrete implementation which is based on Spring's 068 * {@link org.springframework.core.task.TaskExecutor} abstraction, 069 * including dynamic scaling of concurrent consumers and automatic self recovery. 070 * 071 * @author Juergen Hoeller 072 * @since 2.0.3 073 * @see #createListenerConsumer 074 * @see #receiveAndExecute 075 * @see #setTransactionManager 076 */ 077public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer { 078 079 /** 080 * The default receive timeout: 1000 ms = 1 second. 081 */ 082 public static final long DEFAULT_RECEIVE_TIMEOUT = 1000; 083 084 085 private final MessageListenerContainerResourceFactory transactionalResourceFactory = 086 new MessageListenerContainerResourceFactory(); 087 088 private boolean sessionTransactedCalled = false; 089 090 @Nullable 091 private PlatformTransactionManager transactionManager; 092 093 private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(); 094 095 private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; 096 097 098 @Override 099 public void setSessionTransacted(boolean sessionTransacted) { 100 super.setSessionTransacted(sessionTransacted); 101 this.sessionTransactedCalled = true; 102 } 103 104 /** 105 * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager} 106 * to use for transactional wrapping of message reception plus listener execution. 107 * <p>Default is none, not performing any transactional wrapping. 108 * If specified, this will usually be a Spring 109 * {@link org.springframework.transaction.jta.JtaTransactionManager} or one 110 * of its subclasses, in combination with a JTA-aware ConnectionFactory that 111 * this message listener container obtains its Connections from. 112 * <p><b>Note: Consider the use of local JMS transactions instead.</b> 113 * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag 114 * to "true" in order to use a locally transacted JMS Session for the entire 115 * receive processing, including any Session operations performed by a 116 * {@link SessionAwareMessageListener} (e.g. sending a response message). This 117 * allows for fully synchronized Spring transactions based on local JMS 118 * transactions, similar to what 119 * {@link org.springframework.jms.connection.JmsTransactionManager} provides. Check 120 * {@link AbstractMessageListenerContainer}'s javadoc for 121 * a discussion of transaction choices and message redelivery scenarios. 122 * @see #setSessionTransacted(boolean) 123 * @see org.springframework.transaction.jta.JtaTransactionManager 124 * @see org.springframework.jms.connection.JmsTransactionManager 125 */ 126 public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) { 127 this.transactionManager = transactionManager; 128 } 129 130 /** 131 * Return the Spring PlatformTransactionManager to use for transactional 132 * wrapping of message reception plus listener execution. 133 */ 134 @Nullable 135 protected final PlatformTransactionManager getTransactionManager() { 136 return this.transactionManager; 137 } 138 139 /** 140 * Specify the transaction name to use for transactional wrapping. 141 * Default is the bean name of this listener container, if any. 142 * @see org.springframework.transaction.TransactionDefinition#getName() 143 */ 144 public void setTransactionName(String transactionName) { 145 this.transactionDefinition.setName(transactionName); 146 } 147 148 /** 149 * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>. 150 * Default is none, using the transaction manager's default timeout. 151 * @see org.springframework.transaction.TransactionDefinition#getTimeout() 152 * @see #setReceiveTimeout 153 */ 154 public void setTransactionTimeout(int transactionTimeout) { 155 this.transactionDefinition.setTimeout(transactionTimeout); 156 } 157 158 /** 159 * Set the timeout to use for receive calls, in <b>milliseconds</b>. 160 * The default is 1000 ms, that is, 1 second. 161 * <p><b>NOTE:</b> This value needs to be smaller than the transaction 162 * timeout used by the transaction manager (in the appropriate unit, 163 * of course). 0 indicates no timeout at all; however, this is only 164 * feasible if not running within a transaction manager and generally 165 * discouraged since such a listener container cannot cleanly shut down. 166 * A negative value such as -1 indicates a no-wait receive operation. 167 * @see #receiveFromConsumer(MessageConsumer, long) 168 * @see javax.jms.MessageConsumer#receive(long) 169 * @see javax.jms.MessageConsumer#receiveNoWait() 170 * @see javax.jms.MessageConsumer#receive() 171 * @see #setTransactionTimeout 172 */ 173 public void setReceiveTimeout(long receiveTimeout) { 174 this.receiveTimeout = receiveTimeout; 175 } 176 177 /** 178 * Return the receive timeout (ms) configured for this listener container. 179 * @since 4.2 180 */ 181 protected long getReceiveTimeout() { 182 return this.receiveTimeout; 183 } 184 185 186 @Override 187 public void initialize() { 188 // Set sessionTransacted=true in case of a non-JTA transaction manager. 189 if (!this.sessionTransactedCalled && 190 this.transactionManager instanceof ResourceTransactionManager && 191 !TransactionSynchronizationUtils.sameResourceFactory( 192 (ResourceTransactionManager) this.transactionManager, obtainConnectionFactory())) { 193 super.setSessionTransacted(true); 194 } 195 196 // Use bean name as default transaction name. 197 if (this.transactionDefinition.getName() == null) { 198 String beanName = getBeanName(); 199 if (beanName != null) { 200 this.transactionDefinition.setName(beanName); 201 } 202 } 203 204 // Proceed with superclass initialization. 205 super.initialize(); 206 } 207 208 209 /** 210 * Create a MessageConsumer for the given JMS Session, 211 * registering a MessageListener for the specified listener. 212 * @param session the JMS Session to work on 213 * @return the MessageConsumer 214 * @throws javax.jms.JMSException if thrown by JMS methods 215 * @see #receiveAndExecute 216 */ 217 protected MessageConsumer createListenerConsumer(Session session) throws JMSException { 218 Destination destination = getDestination(); 219 if (destination == null) { 220 String destinationName = getDestinationName(); 221 Assert.state(destinationName != null, "No destination set"); 222 destination = resolveDestinationName(session, destinationName); 223 } 224 return createConsumer(session, destination); 225 } 226 227 /** 228 * Execute the listener for a message received from the given consumer, 229 * wrapping the entire operation in an external transaction if demanded. 230 * @param session the JMS Session to work on 231 * @param consumer the MessageConsumer to work on 232 * @return whether a message has been received 233 * @throws JMSException if thrown by JMS methods 234 * @see #doReceiveAndExecute 235 */ 236 protected boolean receiveAndExecute( 237 Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer) 238 throws JMSException { 239 240 if (this.transactionManager != null) { 241 // Execute receive within transaction. 242 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); 243 boolean messageReceived; 244 try { 245 messageReceived = doReceiveAndExecute(invoker, session, consumer, status); 246 } 247 catch (JMSException | RuntimeException | Error ex) { 248 rollbackOnException(this.transactionManager, status, ex); 249 throw ex; 250 } 251 this.transactionManager.commit(status); 252 return messageReceived; 253 } 254 255 else { 256 // Execute receive outside of transaction. 257 return doReceiveAndExecute(invoker, session, consumer, null); 258 } 259 } 260 261 /** 262 * Actually execute the listener for a message received from the given consumer, 263 * fetching all requires resources and invoking the listener. 264 * @param session the JMS Session to work on 265 * @param consumer the MessageConsumer to work on 266 * @param status the TransactionStatus (may be {@code null}) 267 * @return whether a message has been received 268 * @throws JMSException if thrown by JMS methods 269 * @see #doExecuteListener(javax.jms.Session, javax.jms.Message) 270 */ 271 protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session, 272 @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException { 273 274 Connection conToClose = null; 275 Session sessionToClose = null; 276 MessageConsumer consumerToClose = null; 277 try { 278 Session sessionToUse = session; 279 boolean transactional = false; 280 if (sessionToUse == null) { 281 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 282 obtainConnectionFactory(), this.transactionalResourceFactory, true); 283 transactional = (sessionToUse != null); 284 } 285 if (sessionToUse == null) { 286 Connection conToUse; 287 if (sharedConnectionEnabled()) { 288 conToUse = getSharedConnection(); 289 } 290 else { 291 conToUse = createConnection(); 292 conToClose = conToUse; 293 conToUse.start(); 294 } 295 sessionToUse = createSession(conToUse); 296 sessionToClose = sessionToUse; 297 } 298 MessageConsumer consumerToUse = consumer; 299 if (consumerToUse == null) { 300 consumerToUse = createListenerConsumer(sessionToUse); 301 consumerToClose = consumerToUse; 302 } 303 Message message = receiveMessage(consumerToUse); 304 if (message != null) { 305 if (logger.isDebugEnabled()) { 306 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 307 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + 308 sessionToUse + "]"); 309 } 310 messageReceived(invoker, sessionToUse); 311 boolean exposeResource = (!transactional && isExposeListenerSession() && 312 !TransactionSynchronizationManager.hasResource(obtainConnectionFactory())); 313 if (exposeResource) { 314 TransactionSynchronizationManager.bindResource( 315 obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); 316 } 317 try { 318 doExecuteListener(sessionToUse, message); 319 } 320 catch (Throwable ex) { 321 if (status != null) { 322 if (logger.isDebugEnabled()) { 323 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 324 } 325 status.setRollbackOnly(); 326 } 327 handleListenerException(ex); 328 // Rethrow JMSException to indicate an infrastructure problem 329 // that may have to trigger recovery... 330 if (ex instanceof JMSException) { 331 throw (JMSException) ex; 332 } 333 } 334 finally { 335 if (exposeResource) { 336 TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); 337 } 338 } 339 // Indicate that a message has been received. 340 return true; 341 } 342 else { 343 if (logger.isTraceEnabled()) { 344 logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + 345 "session [" + sessionToUse + "] did not receive a message"); 346 } 347 noMessageReceived(invoker, sessionToUse); 348 // Nevertheless call commit, in order to reset the transaction timeout (if any). 349 if (shouldCommitAfterNoMessageReceived(sessionToUse)) { 350 commitIfNecessary(sessionToUse, null); 351 } 352 // Indicate that no message has been received. 353 return false; 354 } 355 } 356 finally { 357 JmsUtils.closeMessageConsumer(consumerToClose); 358 JmsUtils.closeSession(sessionToClose); 359 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); 360 } 361 } 362 363 /** 364 * This implementation checks whether the Session is externally synchronized. 365 * In this case, the Session is not locally transacted, despite the listener 366 * container's "sessionTransacted" flag being set to "true". 367 * @see org.springframework.jms.connection.JmsResourceHolder 368 */ 369 @Override 370 protected boolean isSessionLocallyTransacted(Session session) { 371 if (!super.isSessionLocallyTransacted(session)) { 372 return false; 373 } 374 JmsResourceHolder resourceHolder = 375 (JmsResourceHolder) TransactionSynchronizationManager.getResource(obtainConnectionFactory()); 376 return (resourceHolder == null || resourceHolder instanceof LocallyExposedJmsResourceHolder || 377 !resourceHolder.containsSession(session)); 378 } 379 380 /** 381 * Determine whether to trigger a commit after no message has been received. 382 * This is a good idea on any modern-day JMS provider. 383 * @param session the current JMS Session which received no message 384 * @return whether to call {@link #commitIfNecessary} on the given Session 385 */ 386 protected boolean shouldCommitAfterNoMessageReceived(Session session) { 387 return true; 388 } 389 390 /** 391 * Perform a rollback, handling rollback exceptions properly. 392 * @param status object representing the transaction 393 * @param ex the thrown listener exception or error 394 */ 395 private void rollbackOnException(PlatformTransactionManager manager, TransactionStatus status, Throwable ex) { 396 logger.debug("Initiating transaction rollback on listener exception", ex); 397 try { 398 manager.rollback(status); 399 } 400 catch (RuntimeException ex2) { 401 logger.error("Listener exception overridden by rollback exception", ex); 402 throw ex2; 403 } 404 catch (Error err) { 405 logger.error("Listener exception overridden by rollback error", ex); 406 throw err; 407 } 408 } 409 410 /** 411 * Receive a message from the given consumer. 412 * @param consumer the MessageConsumer to use 413 * @return the Message, or {@code null} if none 414 * @throws JMSException if thrown by JMS methods 415 */ 416 @Nullable 417 protected Message receiveMessage(MessageConsumer consumer) throws JMSException { 418 return receiveFromConsumer(consumer, getReceiveTimeout()); 419 } 420 421 /** 422 * Template method that gets called right when a new message has been received, 423 * before attempting to process it. Allows subclasses to react to the event 424 * of an actual incoming message, for example adapting their consumer count. 425 * @param invoker the invoker object (passed through) 426 * @param session the receiving JMS Session 427 */ 428 protected void messageReceived(Object invoker, Session session) { 429 } 430 431 /** 432 * Template method that gets called when <i>no</i> message has been received, 433 * before returning to the receive loop again. Allows subclasses to react to 434 * the event of no incoming message, for example marking the invoker as idle. 435 * @param invoker the invoker object (passed through) 436 * @param session the receiving JMS Session 437 */ 438 protected void noMessageReceived(Object invoker, Session session) { 439 } 440 441 /** 442 * Fetch an appropriate Connection from the given JmsResourceHolder. 443 * <p>This implementation accepts any JMS 1.1 Connection. 444 * @param holder the JmsResourceHolder 445 * @return an appropriate Connection fetched from the holder, 446 * or {@code null} if none found 447 */ 448 @Nullable 449 protected Connection getConnection(JmsResourceHolder holder) { 450 return holder.getConnection(); 451 } 452 453 /** 454 * Fetch an appropriate Session from the given JmsResourceHolder. 455 * <p>This implementation accepts any JMS 1.1 Session. 456 * @param holder the JmsResourceHolder 457 * @return an appropriate Session fetched from the holder, 458 * or {@code null} if none found 459 */ 460 @Nullable 461 protected Session getSession(JmsResourceHolder holder) { 462 return holder.getSession(); 463 } 464 465 466 /** 467 * ResourceFactory implementation that delegates to this listener container's protected callback methods. 468 */ 469 private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 470 471 @Override 472 @Nullable 473 public Connection getConnection(JmsResourceHolder holder) { 474 return AbstractPollingMessageListenerContainer.this.getConnection(holder); 475 } 476 477 @Override 478 @Nullable 479 public Session getSession(JmsResourceHolder holder) { 480 return AbstractPollingMessageListenerContainer.this.getSession(holder); 481 } 482 483 @Override 484 public Connection createConnection() throws JMSException { 485 if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) { 486 Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection(); 487 return new SingleConnectionFactory(sharedCon).createConnection(); 488 } 489 else { 490 return AbstractPollingMessageListenerContainer.this.createConnection(); 491 } 492 } 493 494 @Override 495 public Session createSession(Connection con) throws JMSException { 496 return AbstractPollingMessageListenerContainer.this.createSession(con); 497 } 498 499 @Override 500 public boolean isSynchedLocalTransactionAllowed() { 501 return AbstractPollingMessageListenerContainer.this.isSessionTransacted(); 502 } 503 } 504 505}