001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.JMSException; 022import javax.jms.Message; 023import javax.jms.MessageConsumer; 024import javax.jms.Session; 025 026import org.springframework.jms.connection.ConnectionFactoryUtils; 027import org.springframework.jms.connection.JmsResourceHolder; 028import org.springframework.jms.connection.SingleConnectionFactory; 029import org.springframework.jms.support.JmsUtils; 030import org.springframework.transaction.PlatformTransactionManager; 031import org.springframework.transaction.TransactionStatus; 032import org.springframework.transaction.support.DefaultTransactionDefinition; 033import org.springframework.transaction.support.ResourceTransactionManager; 034import org.springframework.transaction.support.TransactionSynchronizationManager; 035import org.springframework.transaction.support.TransactionSynchronizationUtils; 036 037/** 038 * Base class for listener container implementations which are based on polling. 039 * Provides support for listener handling based on {@link javax.jms.MessageConsumer}, 040 * optionally participating in externally managed transactions. 041 * 042 * <p>This listener container variant is built for repeated polling attempts, 043 * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used 044 * may be reobtained fo reach attempt or cached in between attempts; this is up 045 * to the concrete implementation. The receive timeout for each attempt can be 046 * configured through the {@link #setReceiveTimeout "receiveTimeout"} property. 047 * 048 * <p>The underlying mechanism is based on standard JMS MessageConsumer handling, 049 * which is perfectly compatible with both native JMS and JMS in a Java EE environment. 050 * Neither the JMS {@code MessageConsumer.setMessageListener} facility nor the JMS 051 * ServerSessionPool facility is required. A further advantage of this approach is 052 * full control over the listening process, allowing for custom scaling and throttling 053 * and of concurrent message processing (which is up to concrete subclasses). 054 * 055 * <p>Message reception and listener execution can automatically be wrapped 056 * in transactions through passing a Spring 057 * {@link org.springframework.transaction.PlatformTransactionManager} into the 058 * {@link #setTransactionManager "transactionManager"} property. This will usually 059 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a 060 * Java EE environment, in combination with a JTA-aware JMS ConnectionFactory 061 * obtained from JNDI (check your application server's documentation). 062 * 063 * <p>This base class does not assume any specific mechanism for asynchronous 064 * execution of polling invokers. Check out {@link DefaultMessageListenerContainer} 065 * for a concrete implementation which is based on Spring's 066 * {@link org.springframework.core.task.TaskExecutor} abstraction, 067 * including dynamic scaling of concurrent consumers and automatic self recovery. 068 * 069 * @author Juergen Hoeller 070 * @since 2.0.3 071 * @see #createListenerConsumer 072 * @see #receiveAndExecute 073 * @see #setTransactionManager 074 */ 075public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer { 076 077 /** 078 * The default receive timeout: 1000 ms = 1 second. 079 */ 080 public static final long DEFAULT_RECEIVE_TIMEOUT = 1000; 081 082 083 private final MessageListenerContainerResourceFactory transactionalResourceFactory = 084 new MessageListenerContainerResourceFactory(); 085 086 private boolean sessionTransactedCalled = false; 087 088 private PlatformTransactionManager transactionManager; 089 090 private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition(); 091 092 private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; 093 094 095 @Override 096 public void setSessionTransacted(boolean sessionTransacted) { 097 super.setSessionTransacted(sessionTransacted); 098 this.sessionTransactedCalled = true; 099 } 100 101 /** 102 * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager} 103 * to use for transactional wrapping of message reception plus listener execution. 104 * <p>Default is none, not performing any transactional wrapping. 105 * If specified, this will usually be a Spring 106 * {@link org.springframework.transaction.jta.JtaTransactionManager} or one 107 * of its subclasses, in combination with a JTA-aware ConnectionFactory that 108 * this message listener container obtains its Connections from. 109 * <p><b>Note: Consider the use of local JMS transactions instead.</b> 110 * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag 111 * to "true" in order to use a locally transacted JMS Session for the entire 112 * receive processing, including any Session operations performed by a 113 * {@link SessionAwareMessageListener} (e.g. sending a response message). This 114 * allows for fully synchronized Spring transactions based on local JMS 115 * transactions, similar to what 116 * {@link org.springframework.jms.connection.JmsTransactionManager} provides. Check 117 * {@link AbstractMessageListenerContainer}'s javadoc for 118 * a discussion of transaction choices and message redelivery scenarios. 119 * @see #setSessionTransacted(boolean) 120 * @see org.springframework.transaction.jta.JtaTransactionManager 121 * @see org.springframework.jms.connection.JmsTransactionManager 122 */ 123 public void setTransactionManager(PlatformTransactionManager transactionManager) { 124 this.transactionManager = transactionManager; 125 } 126 127 /** 128 * Return the Spring PlatformTransactionManager to use for transactional 129 * wrapping of message reception plus listener execution. 130 */ 131 protected final PlatformTransactionManager getTransactionManager() { 132 return this.transactionManager; 133 } 134 135 /** 136 * Specify the transaction name to use for transactional wrapping. 137 * Default is the bean name of this listener container, if any. 138 * @see org.springframework.transaction.TransactionDefinition#getName() 139 */ 140 public void setTransactionName(String transactionName) { 141 this.transactionDefinition.setName(transactionName); 142 } 143 144 /** 145 * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>. 146 * Default is none, using the transaction manager's default timeout. 147 * @see org.springframework.transaction.TransactionDefinition#getTimeout() 148 * @see #setReceiveTimeout 149 */ 150 public void setTransactionTimeout(int transactionTimeout) { 151 this.transactionDefinition.setTimeout(transactionTimeout); 152 } 153 154 /** 155 * Set the timeout to use for receive calls, in <b>milliseconds</b>. 156 * The default is 1000 ms, that is, 1 second. 157 * <p><b>NOTE:</b> This value needs to be smaller than the transaction 158 * timeout used by the transaction manager (in the appropriate unit, 159 * of course). 0 indicates no timeout at all; however, this is only 160 * feasible if not running within a transaction manager and generally 161 * discouraged since such a listener container cannot cleanly shut down. 162 * A negative value such as -1 indicates a no-wait receive operation. 163 * @see #receiveFromConsumer(MessageConsumer, long) 164 * @see javax.jms.MessageConsumer#receive(long) 165 * @see javax.jms.MessageConsumer#receiveNoWait() 166 * @see javax.jms.MessageConsumer#receive() 167 * @see #setTransactionTimeout 168 */ 169 public void setReceiveTimeout(long receiveTimeout) { 170 this.receiveTimeout = receiveTimeout; 171 } 172 173 /** 174 * Return the receive timeout (ms) configured for this listener container. 175 * @since 4.2 176 */ 177 protected long getReceiveTimeout() { 178 return this.receiveTimeout; 179 } 180 181 182 @Override 183 public void initialize() { 184 // Set sessionTransacted=true in case of a non-JTA transaction manager. 185 if (!this.sessionTransactedCalled && 186 this.transactionManager instanceof ResourceTransactionManager && 187 !TransactionSynchronizationUtils.sameResourceFactory( 188 (ResourceTransactionManager) this.transactionManager, getConnectionFactory())) { 189 super.setSessionTransacted(true); 190 } 191 192 // Use bean name as default transaction name. 193 if (this.transactionDefinition.getName() == null) { 194 this.transactionDefinition.setName(getBeanName()); 195 } 196 197 // Proceed with superclass initialization. 198 super.initialize(); 199 } 200 201 202 /** 203 * Create a MessageConsumer for the given JMS Session, 204 * registering a MessageListener for the specified listener. 205 * @param session the JMS Session to work on 206 * @return the MessageConsumer 207 * @throws javax.jms.JMSException if thrown by JMS methods 208 * @see #receiveAndExecute 209 */ 210 protected MessageConsumer createListenerConsumer(Session session) throws JMSException { 211 Destination destination = getDestination(); 212 if (destination == null) { 213 destination = resolveDestinationName(session, getDestinationName()); 214 } 215 return createConsumer(session, destination); 216 } 217 218 /** 219 * Execute the listener for a message received from the given consumer, 220 * wrapping the entire operation in an external transaction if demanded. 221 * @param session the JMS Session to work on 222 * @param consumer the MessageConsumer to work on 223 * @return whether a message has been received 224 * @throws JMSException if thrown by JMS methods 225 * @see #doReceiveAndExecute 226 */ 227 protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer) 228 throws JMSException { 229 230 if (this.transactionManager != null) { 231 // Execute receive within transaction. 232 TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition); 233 boolean messageReceived; 234 try { 235 messageReceived = doReceiveAndExecute(invoker, session, consumer, status); 236 } 237 catch (JMSException ex) { 238 rollbackOnException(status, ex); 239 throw ex; 240 } 241 catch (RuntimeException ex) { 242 rollbackOnException(status, ex); 243 throw ex; 244 } 245 catch (Error err) { 246 rollbackOnException(status, err); 247 throw err; 248 } 249 this.transactionManager.commit(status); 250 return messageReceived; 251 } 252 253 else { 254 // Execute receive outside of transaction. 255 return doReceiveAndExecute(invoker, session, consumer, null); 256 } 257 } 258 259 /** 260 * Actually execute the listener for a message received from the given consumer, 261 * fetching all requires resources and invoking the listener. 262 * @param session the JMS Session to work on 263 * @param consumer the MessageConsumer to work on 264 * @param status the TransactionStatus (may be {@code null}) 265 * @return whether a message has been received 266 * @throws JMSException if thrown by JMS methods 267 * @see #doExecuteListener(javax.jms.Session, javax.jms.Message) 268 */ 269 protected boolean doReceiveAndExecute( 270 Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) 271 throws JMSException { 272 273 Connection conToClose = null; 274 Session sessionToClose = null; 275 MessageConsumer consumerToClose = null; 276 try { 277 Session sessionToUse = session; 278 boolean transactional = false; 279 if (sessionToUse == null) { 280 sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( 281 getConnectionFactory(), this.transactionalResourceFactory, true); 282 transactional = (sessionToUse != null); 283 } 284 if (sessionToUse == null) { 285 Connection conToUse; 286 if (sharedConnectionEnabled()) { 287 conToUse = getSharedConnection(); 288 } 289 else { 290 conToUse = createConnection(); 291 conToClose = conToUse; 292 conToUse.start(); 293 } 294 sessionToUse = createSession(conToUse); 295 sessionToClose = sessionToUse; 296 } 297 MessageConsumer consumerToUse = consumer; 298 if (consumerToUse == null) { 299 consumerToUse = createListenerConsumer(sessionToUse); 300 consumerToClose = consumerToUse; 301 } 302 Message message = receiveMessage(consumerToUse); 303 if (message != null) { 304 if (logger.isDebugEnabled()) { 305 logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 306 consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + 307 sessionToUse + "]"); 308 } 309 messageReceived(invoker, sessionToUse); 310 boolean exposeResource = (!transactional && isExposeListenerSession() && 311 !TransactionSynchronizationManager.hasResource(getConnectionFactory())); 312 if (exposeResource) { 313 TransactionSynchronizationManager.bindResource( 314 getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); 315 } 316 try { 317 doExecuteListener(sessionToUse, message); 318 } 319 catch (Throwable ex) { 320 if (status != null) { 321 if (logger.isDebugEnabled()) { 322 logger.debug("Rolling back transaction because of listener exception thrown: " + ex); 323 } 324 status.setRollbackOnly(); 325 } 326 handleListenerException(ex); 327 // Rethrow JMSException to indicate an infrastructure problem 328 // that may have to trigger recovery... 329 if (ex instanceof JMSException) { 330 throw (JMSException) ex; 331 } 332 } 333 finally { 334 if (exposeResource) { 335 TransactionSynchronizationManager.unbindResource(getConnectionFactory()); 336 } 337 } 338 // Indicate that a message has been received. 339 return true; 340 } 341 else { 342 if (logger.isTraceEnabled()) { 343 logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + 344 "session [" + sessionToUse + "] did not receive a message"); 345 } 346 noMessageReceived(invoker, sessionToUse); 347 // Nevertheless call commit, in order to reset the transaction timeout (if any). 348 if (shouldCommitAfterNoMessageReceived(sessionToUse)) { 349 commitIfNecessary(sessionToUse, message); 350 } 351 // Indicate that no message has been received. 352 return false; 353 } 354 } 355 finally { 356 JmsUtils.closeMessageConsumer(consumerToClose); 357 JmsUtils.closeSession(sessionToClose); 358 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); 359 } 360 } 361 362 /** 363 * This implementation checks whether the Session is externally synchronized. 364 * In this case, the Session is not locally transacted, despite the listener 365 * container's "sessionTransacted" flag being set to "true". 366 * @see org.springframework.jms.connection.JmsResourceHolder 367 */ 368 @Override 369 protected boolean isSessionLocallyTransacted(Session session) { 370 if (!super.isSessionLocallyTransacted(session)) { 371 return false; 372 } 373 JmsResourceHolder resourceHolder = 374 (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); 375 return (resourceHolder == null || resourceHolder instanceof LocallyExposedJmsResourceHolder || 376 !resourceHolder.containsSession(session)); 377 } 378 379 /** 380 * Determine whether to trigger a commit after no message has been received. 381 * This is a good idea on any modern-day JMS provider. 382 * @param session the current JMS Session which received no message 383 * @return whether to call {@link #commitIfNecessary} on the given Session 384 */ 385 protected boolean shouldCommitAfterNoMessageReceived(Session session) { 386 return true; 387 } 388 389 /** 390 * Perform a rollback, handling rollback exceptions properly. 391 * @param status object representing the transaction 392 * @param ex the thrown listener exception or error 393 */ 394 private void rollbackOnException(TransactionStatus status, Throwable ex) { 395 logger.debug("Initiating transaction rollback on listener exception", ex); 396 try { 397 this.transactionManager.rollback(status); 398 } 399 catch (RuntimeException ex2) { 400 logger.error("Listener exception overridden by rollback exception", ex); 401 throw ex2; 402 } 403 catch (Error err) { 404 logger.error("Listener exception overridden by rollback error", ex); 405 throw err; 406 } 407 } 408 409 /** 410 * Receive a message from the given consumer. 411 * @param consumer the MessageConsumer to use 412 * @return the Message, or {@code null} if none 413 * @throws JMSException if thrown by JMS methods 414 */ 415 protected Message receiveMessage(MessageConsumer consumer) throws JMSException { 416 return receiveFromConsumer(consumer, getReceiveTimeout()); 417 } 418 419 /** 420 * Template method that gets called right when a new message has been received, 421 * before attempting to process it. Allows subclasses to react to the event 422 * of an actual incoming message, for example adapting their consumer count. 423 * @param invoker the invoker object (passed through) 424 * @param session the receiving JMS Session 425 */ 426 protected void messageReceived(Object invoker, Session session) { 427 } 428 429 /** 430 * Template method that gets called when <i>no</i> message has been received, 431 * before returning to the receive loop again. Allows subclasses to react to 432 * the event of no incoming message, for example marking the invoker as idle. 433 * @param invoker the invoker object (passed through) 434 * @param session the receiving JMS Session 435 */ 436 protected void noMessageReceived(Object invoker, Session session) { 437 } 438 439 /** 440 * Fetch an appropriate Connection from the given JmsResourceHolder. 441 * <p>This implementation accepts any JMS 1.1 Connection. 442 * @param holder the JmsResourceHolder 443 * @return an appropriate Connection fetched from the holder, 444 * or {@code null} if none found 445 */ 446 protected Connection getConnection(JmsResourceHolder holder) { 447 return holder.getConnection(); 448 } 449 450 /** 451 * Fetch an appropriate Session from the given JmsResourceHolder. 452 * <p>This implementation accepts any JMS 1.1 Session. 453 * @param holder the JmsResourceHolder 454 * @return an appropriate Session fetched from the holder, 455 * or {@code null} if none found 456 */ 457 protected Session getSession(JmsResourceHolder holder) { 458 return holder.getSession(); 459 } 460 461 462 /** 463 * ResourceFactory implementation that delegates to this listener container's protected callback methods. 464 */ 465 private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory { 466 467 @Override 468 public Connection getConnection(JmsResourceHolder holder) { 469 return AbstractPollingMessageListenerContainer.this.getConnection(holder); 470 } 471 472 @Override 473 public Session getSession(JmsResourceHolder holder) { 474 return AbstractPollingMessageListenerContainer.this.getSession(holder); 475 } 476 477 @Override 478 public Connection createConnection() throws JMSException { 479 if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) { 480 Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection(); 481 return new SingleConnectionFactory(sharedCon).createConnection(); 482 } 483 else { 484 return AbstractPollingMessageListenerContainer.this.createConnection(); 485 } 486 } 487 488 @Override 489 public Session createSession(Connection con) throws JMSException { 490 return AbstractPollingMessageListenerContainer.this.createSession(con); 491 } 492 493 @Override 494 public boolean isSynchedLocalTransactionAllowed() { 495 return AbstractPollingMessageListenerContainer.this.isSessionTransacted(); 496 } 497 } 498 499}