001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import javax.jms.Connection; 020import javax.jms.Destination; 021import javax.jms.ExceptionListener; 022import javax.jms.JMSException; 023import javax.jms.Message; 024import javax.jms.MessageConsumer; 025import javax.jms.MessageListener; 026import javax.jms.Queue; 027import javax.jms.Session; 028import javax.jms.Topic; 029 030import org.springframework.jms.support.JmsUtils; 031import org.springframework.jms.support.QosSettings; 032import org.springframework.jms.support.converter.MessageConverter; 033import org.springframework.lang.Nullable; 034import org.springframework.util.ErrorHandler; 035 036/** 037 * Abstract base class for Spring message listener container implementations. 038 * Can either host a standard JMS {@link javax.jms.MessageListener} or Spring's 039 * {@link SessionAwareMessageListener} for actual message processing. 040 * 041 * <p>Usually holds a single JMS {@link Connection} that all listeners are supposed 042 * to be registered on, which is the standard JMS way of managing listener sessions. 043 * Can alternatively also be used with a fresh Connection per listener, for Java EE 044 * style XA-aware JMS messaging. The actual registration process is up to concrete 045 * subclasses. 046 * 047 * <p><b>NOTE:</b> The default behavior of this message listener container is to 048 * <b>never</b> propagate an exception thrown by a message listener up to the JMS 049 * provider. Instead, it will log any such exception at the error level. 050 * This means that from the perspective of the attendant JMS provider no such 051 * listener will ever fail. However, if error handling is necessary, then 052 * any implementation of the {@link ErrorHandler} strategy may be provided to 053 * the {@link #setErrorHandler(ErrorHandler)} method. Note that JMSExceptions 054 * <b>will</b> be passed to the ErrorHandler in addition to (but after) being 055 * passed to an {@link ExceptionListener}, if one has been provided. 056 * 057 * <p>The listener container offers the following message acknowledgment options: 058 * <ul> 059 * <li>"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): 060 * This mode is container-dependent: For {@link DefaultMessageListenerContainer}, 061 * it means automatic message acknowledgment <i>before</i> listener execution, with 062 * no redelivery in case of an exception and no redelivery in case of other listener 063 * execution interruptions either. For {@link SimpleMessageListenerContainer}, 064 * it means automatic message acknowledgment <i>after</i> listener execution, with 065 * no redelivery in case of a user exception thrown but potential redelivery in case 066 * of the JVM dying during listener execution. In order to consistently arrange for 067 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - 068 * preferably - setting "sessionTransacted" to "true" instead. 069 * <li>"sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": 070 * <i>Lazy</i> message acknowledgment during ({@link DefaultMessageListenerContainer}) 071 * or shortly after ({@link SimpleMessageListenerContainer}) listener execution; 072 * no redelivery in case of a user exception thrown but potential redelivery in case 073 * of the JVM dying during listener execution. In order to consistently arrange for 074 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - 075 * preferably - setting "sessionTransacted" to "true" instead. 076 * <li>"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": 077 * Automatic message acknowledgment <i>after</i> successful listener execution; 078 * best-effort redelivery in case of a user exception thrown as well as in case 079 * of other listener execution interruptions (such as the JVM dying). 080 * <li>"sessionTransacted" set to "true": 081 * Transactional acknowledgment after successful listener execution; 082 * <i>guaranteed redelivery</i> in case of a user exception thrown as well as 083 * in case of other listener execution interruptions (such as the JVM dying). 084 * </ul> 085 * 086 * <p>There are two solutions to the duplicate message processing problem: 087 * <ul> 088 * <li>Either add <i>duplicate message detection</i> to your listener, in the 089 * form of a business entity existence check or a protocol table check. This 090 * usually just needs to be done in case of the JMSRedelivered flag being 091 * set on the incoming message (otherwise just process straightforwardly). 092 * Note that with "sessionTransacted" set to "true", duplicate messages will 093 * only appear in case of the JVM dying at the most unfortunate point possible 094 * (i.e. after your business logic executed but before the JMS part got committed), 095 * so duplicate message detection is just there to cover a corner case. 096 * <li>Or wrap your <i>entire processing with an XA transaction</i>, covering the 097 * reception of the JMS message as well as the execution of the business logic in 098 * your message listener (including database operations etc). This is only 099 * supported by {@link DefaultMessageListenerContainer}, through specifying 100 * an external "transactionManager" (typically a 101 * {@link org.springframework.transaction.jta.JtaTransactionManager}, with 102 * a corresponding XA-aware JMS {@link javax.jms.ConnectionFactory} passed in 103 * as "connectionFactory"). 104 * </ul> 105 * Note that XA transaction coordination adds significant runtime overhead, 106 * so it might be feasible to avoid it unless absolutely necessary. 107 * 108 * <p><b>Recommendations:</b> 109 * <ul> 110 * <li>The general recommendation is to set "sessionTransacted" to "true", 111 * typically in combination with local database transactions triggered by the 112 * listener implementation, through Spring's standard transaction facilities. 113 * This will work nicely in Tomcat or in a standalone environment, often 114 * combined with custom duplicate message detection (if it is unacceptable 115 * to ever process the same message twice). 116 * <li>Alternatively, specify a 117 * {@link org.springframework.transaction.jta.JtaTransactionManager} as 118 * "transactionManager" for a fully XA-aware JMS provider - typically when 119 * running on a Java EE server, but also for other environments with a JTA 120 * transaction manager present. This will give full "exactly-once" guarantees 121 * without custom duplicate message checks, at the price of additional 122 * runtime processing overhead. 123 * </ul> 124 * 125 * <p>Note that the "sessionTransacted" flag is strongly recommended over 126 * {@link org.springframework.jms.connection.JmsTransactionManager}, provided 127 * that transactions do not need to be managed externally. As a consequence, 128 * set the transaction manager only if you are using JTA or if you need to 129 * synchronize with custom external transaction arrangements. 130 * 131 * @author Juergen Hoeller 132 * @author Stephane Nicoll 133 * @since 2.0 134 * @see #setMessageListener 135 * @see javax.jms.MessageListener 136 * @see SessionAwareMessageListener 137 * @see #handleListenerException 138 * @see DefaultMessageListenerContainer 139 * @see SimpleMessageListenerContainer 140 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager 141 */ 142public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer 143 implements MessageListenerContainer { 144 145 @Nullable 146 private volatile Object destination; 147 148 @Nullable 149 private volatile String messageSelector; 150 151 @Nullable 152 private volatile Object messageListener; 153 154 private boolean subscriptionDurable = false; 155 156 private boolean subscriptionShared = false; 157 158 @Nullable 159 private String subscriptionName; 160 161 @Nullable 162 private Boolean replyPubSubDomain; 163 164 @Nullable 165 private QosSettings replyQosSettings; 166 167 private boolean pubSubNoLocal = false; 168 169 @Nullable 170 private MessageConverter messageConverter; 171 172 @Nullable 173 private ExceptionListener exceptionListener; 174 175 @Nullable 176 private ErrorHandler errorHandler; 177 178 private boolean exposeListenerSession = true; 179 180 private boolean acceptMessagesWhileStopping = false; 181 182 183 /** 184 * Specify concurrency limits. 185 */ 186 public abstract void setConcurrency(String concurrency); 187 188 /** 189 * Set the destination to receive messages from. 190 * <p>Alternatively, specify a "destinationName", to be dynamically 191 * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}. 192 * <p>Note: The destination may be replaced at runtime, with the listener 193 * container picking up the new destination immediately (works e.g. with 194 * DefaultMessageListenerContainer, as long as the cache level is less than 195 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 196 * @see #setDestinationName(String) 197 */ 198 public void setDestination(@Nullable Destination destination) { 199 this.destination = destination; 200 if (destination instanceof Topic && !(destination instanceof Queue)) { 201 // Clearly a Topic: let's set the "pubSubDomain" flag accordingly. 202 setPubSubDomain(true); 203 } 204 } 205 206 /** 207 * Return the destination to receive messages from. Will be {@code null} 208 * if the configured destination is not an actual {@link Destination} type; 209 * c.f. {@link #setDestinationName(String) when the destination is a String}. 210 */ 211 @Nullable 212 public Destination getDestination() { 213 return (this.destination instanceof Destination ? (Destination) this.destination : null); 214 } 215 216 /** 217 * Set the name of the destination to receive messages from. 218 * <p>The specified name will be dynamically resolved via the configured 219 * {@link #setDestinationResolver destination resolver}. 220 * <p>Alternatively, specify a JMS {@link Destination} object as "destination". 221 * <p>Note: The destination may be replaced at runtime, with the listener 222 * container picking up the new destination immediately (works e.g. with 223 * DefaultMessageListenerContainer, as long as the cache level is less than 224 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 225 * @see #setDestination(javax.jms.Destination) 226 */ 227 public void setDestinationName(@Nullable String destinationName) { 228 this.destination = destinationName; 229 } 230 231 /** 232 * Return the name of the destination to receive messages from. 233 * Will be {@code null} if the configured destination is not a 234 * {@link String} type; c.f. {@link #setDestination(Destination) when 235 * it is an actual Destination}. 236 */ 237 @Nullable 238 public String getDestinationName() { 239 return (this.destination instanceof String ? (String) this.destination : null); 240 } 241 242 /** 243 * Return a descriptive String for this container's JMS destination 244 * (never {@code null}). 245 */ 246 protected String getDestinationDescription() { 247 Object destination = this.destination; 248 return (destination != null ? destination.toString() : ""); 249 } 250 251 /** 252 * Set the JMS message selector expression (or {@code null} if none). 253 * Default is none. 254 * <p>See the JMS specification for a detailed definition of selector expressions. 255 * <p>Note: The message selector may be replaced at runtime, with the listener 256 * container picking up the new selector value immediately (works e.g. with 257 * DefaultMessageListenerContainer, as long as the cache level is less than 258 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 259 */ 260 public void setMessageSelector(@Nullable String messageSelector) { 261 this.messageSelector = messageSelector; 262 } 263 264 /** 265 * Return the JMS message selector expression (or {@code null} if none). 266 */ 267 @Nullable 268 public String getMessageSelector() { 269 return this.messageSelector; 270 } 271 272 273 /** 274 * Set the message listener implementation to register. 275 * This can be either a standard JMS {@link MessageListener} object 276 * or a Spring {@link SessionAwareMessageListener} object. 277 * <p>Note: The message listener may be replaced at runtime, with the listener 278 * container picking up the new listener object immediately (works e.g. with 279 * DefaultMessageListenerContainer, as long as the cache level is less than 280 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 281 * @throws IllegalArgumentException if the supplied listener is not a 282 * {@link MessageListener} or a {@link SessionAwareMessageListener} 283 * @see javax.jms.MessageListener 284 * @see SessionAwareMessageListener 285 */ 286 public void setMessageListener(@Nullable Object messageListener) { 287 checkMessageListener(messageListener); 288 this.messageListener = messageListener; 289 if (messageListener != null && this.subscriptionName == null) { 290 this.subscriptionName = getDefaultSubscriptionName(messageListener); 291 } 292 } 293 294 /** 295 * Return the message listener object to register. 296 */ 297 @Nullable 298 public Object getMessageListener() { 299 return this.messageListener; 300 } 301 302 /** 303 * Check the given message listener, throwing an exception 304 * if it does not correspond to a supported listener type. 305 * <p>By default, only a standard JMS {@link MessageListener} object or a 306 * Spring {@link SessionAwareMessageListener} object will be accepted. 307 * @param messageListener the message listener object to check 308 * @throws IllegalArgumentException if the supplied listener is not a 309 * {@link MessageListener} or a {@link SessionAwareMessageListener} 310 * @see javax.jms.MessageListener 311 * @see SessionAwareMessageListener 312 */ 313 protected void checkMessageListener(@Nullable Object messageListener) { 314 if (messageListener != null && !(messageListener instanceof MessageListener || 315 messageListener instanceof SessionAwareMessageListener)) { 316 throw new IllegalArgumentException( 317 "Message listener needs to be of type [" + MessageListener.class.getName() + 318 "] or [" + SessionAwareMessageListener.class.getName() + "]"); 319 } 320 } 321 322 /** 323 * Determine the default subscription name for the given message listener. 324 * @param messageListener the message listener object to check 325 * @return the default subscription name 326 * @see SubscriptionNameProvider 327 */ 328 protected String getDefaultSubscriptionName(Object messageListener) { 329 if (messageListener instanceof SubscriptionNameProvider) { 330 return ((SubscriptionNameProvider) messageListener).getSubscriptionName(); 331 } 332 else { 333 return messageListener.getClass().getName(); 334 } 335 } 336 337 /** 338 * Set whether to make the subscription durable. The durable subscription name 339 * to be used can be specified through the "subscriptionName" property. 340 * <p>Default is "false". Set this to "true" to register a durable subscription, 341 * typically in combination with a "subscriptionName" value (unless 342 * your message listener class name is good enough as subscription name). 343 * <p>Only makes sense when listening to a topic (pub-sub domain), 344 * therefore this method switches the "pubSubDomain" flag as well. 345 * @see #setSubscriptionName 346 * @see #setPubSubDomain 347 */ 348 public void setSubscriptionDurable(boolean subscriptionDurable) { 349 this.subscriptionDurable = subscriptionDurable; 350 if (subscriptionDurable) { 351 setPubSubDomain(true); 352 } 353 } 354 355 /** 356 * Return whether to make the subscription durable. 357 */ 358 public boolean isSubscriptionDurable() { 359 return this.subscriptionDurable; 360 } 361 362 /** 363 * Set whether to make the subscription shared. The shared subscription name 364 * to be used can be specified through the "subscriptionName" property. 365 * <p>Default is "false". Set this to "true" to register a shared subscription, 366 * typically in combination with a "subscriptionName" value (unless 367 * your message listener class name is good enough as subscription name). 368 * Note that shared subscriptions may also be durable, so this flag can 369 * (and often will) be combined with "subscriptionDurable" as well. 370 * <p>Only makes sense when listening to a topic (pub-sub domain), 371 * therefore this method switches the "pubSubDomain" flag as well. 372 * <p><b>Requires a JMS 2.0 compatible message broker.</b> 373 * @since 4.1 374 * @see #setSubscriptionName 375 * @see #setSubscriptionDurable 376 * @see #setPubSubDomain 377 */ 378 public void setSubscriptionShared(boolean subscriptionShared) { 379 this.subscriptionShared = subscriptionShared; 380 if (subscriptionShared) { 381 setPubSubDomain(true); 382 } 383 } 384 385 /** 386 * Return whether to make the subscription shared. 387 * @since 4.1 388 */ 389 public boolean isSubscriptionShared() { 390 return this.subscriptionShared; 391 } 392 393 /** 394 * Set the name of a subscription to create. To be applied in case 395 * of a topic (pub-sub domain) with a shared or durable subscription. 396 * <p>The subscription name needs to be unique within this client's 397 * JMS client id. Default is the class name of the specified message listener. 398 * <p>Note: Only 1 concurrent consumer (which is the default of this 399 * message listener container) is allowed for each subscription, 400 * except for a shared subscription (which requires JMS 2.0). 401 * @since 4.1 402 * @see #setPubSubDomain 403 * @see #setSubscriptionDurable 404 * @see #setSubscriptionShared 405 * @see #setClientId 406 * @see #setMessageListener 407 */ 408 public void setSubscriptionName(@Nullable String subscriptionName) { 409 this.subscriptionName = subscriptionName; 410 } 411 412 /** 413 * Return the name of a subscription to create, if any. 414 * @since 4.1 415 */ 416 @Nullable 417 public String getSubscriptionName() { 418 return this.subscriptionName; 419 } 420 421 /** 422 * Set the name of a durable subscription to create. This method switches 423 * to pub-sub domain mode and activates subscription durability as well. 424 * <p>The durable subscription name needs to be unique within this client's 425 * JMS client id. Default is the class name of the specified message listener. 426 * <p>Note: Only 1 concurrent consumer (which is the default of this 427 * message listener container) is allowed for each durable subscription, 428 * except for a shared durable subscription (which requires JMS 2.0). 429 * @see #setPubSubDomain 430 * @see #setSubscriptionDurable 431 * @see #setSubscriptionShared 432 * @see #setClientId 433 * @see #setMessageListener 434 */ 435 public void setDurableSubscriptionName(@Nullable String durableSubscriptionName) { 436 this.subscriptionName = durableSubscriptionName; 437 this.subscriptionDurable = (durableSubscriptionName != null); 438 } 439 440 /** 441 * Return the name of a durable subscription to create, if any. 442 */ 443 @Nullable 444 public String getDurableSubscriptionName() { 445 return (this.subscriptionDurable ? this.subscriptionName : null); 446 } 447 448 /** 449 * Set whether to inhibit the delivery of messages published by its own connection. 450 * Default is "false". 451 * @since 4.1 452 * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean) 453 */ 454 public void setPubSubNoLocal(boolean pubSubNoLocal) { 455 this.pubSubNoLocal = pubSubNoLocal; 456 } 457 458 /** 459 * Return whether to inhibit the delivery of messages published by its own connection. 460 * @since 4.1 461 */ 462 public boolean isPubSubNoLocal() { 463 return this.pubSubNoLocal; 464 } 465 466 /** 467 * Configure the reply destination type. By default, the configured {@code pubSubDomain} 468 * value is used (see {@link #isPubSubDomain()}. 469 * <p>This setting primarily indicates what type of destination to resolve if dynamic 470 * destinations are enabled. 471 * @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link Topic Topics}), 472 * "false" for the Point-to-Point domain ({@link Queue Queues}) 473 * @since 4.2 474 * @see #setDestinationResolver 475 */ 476 public void setReplyPubSubDomain(boolean replyPubSubDomain) { 477 this.replyPubSubDomain = replyPubSubDomain; 478 } 479 480 /** 481 * Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used 482 * for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues}) 483 * is used. 484 * @since 4.2 485 */ 486 @Override 487 public boolean isReplyPubSubDomain() { 488 if (this.replyPubSubDomain != null) { 489 return this.replyPubSubDomain; 490 } 491 else { 492 return isPubSubDomain(); 493 } 494 } 495 496 /** 497 * Configure the {@link QosSettings} to use when sending a reply. Can be set to 498 * {@code null} to indicate that the broker's defaults should be used. 499 * @param replyQosSettings the QoS settings to use when sending a reply or {@code null} 500 * to use the default vas. 501 * @since 5.0 502 */ 503 public void setReplyQosSettings(@Nullable QosSettings replyQosSettings) { 504 this.replyQosSettings = replyQosSettings; 505 } 506 507 @Override 508 @Nullable 509 public QosSettings getReplyQosSettings() { 510 return this.replyQosSettings; 511 } 512 513 /** 514 * Set the {@link MessageConverter} strategy for converting JMS Messages. 515 * @since 4.1 516 */ 517 public void setMessageConverter(@Nullable MessageConverter messageConverter) { 518 this.messageConverter = messageConverter; 519 } 520 521 @Override 522 @Nullable 523 public MessageConverter getMessageConverter() { 524 return this.messageConverter; 525 } 526 527 /** 528 * Set the JMS ExceptionListener to notify in case of a JMSException thrown 529 * by the registered message listener or the invocation infrastructure. 530 */ 531 public void setExceptionListener(@Nullable ExceptionListener exceptionListener) { 532 this.exceptionListener = exceptionListener; 533 } 534 535 /** 536 * Return the JMS ExceptionListener to notify in case of a JMSException thrown 537 * by the registered message listener or the invocation infrastructure, if any. 538 */ 539 @Nullable 540 public ExceptionListener getExceptionListener() { 541 return this.exceptionListener; 542 } 543 544 /** 545 * Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown 546 * while processing a Message. 547 * <p>By default, there will be <b>no</b> ErrorHandler so that error-level 548 * logging is the only result. 549 */ 550 public void setErrorHandler(@Nullable ErrorHandler errorHandler) { 551 this.errorHandler = errorHandler; 552 } 553 554 /** 555 * Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown 556 * while processing a Message. 557 * @since 4.1 558 */ 559 @Nullable 560 public ErrorHandler getErrorHandler() { 561 return this.errorHandler; 562 } 563 564 /** 565 * Set whether to expose the listener JMS Session to a registered 566 * {@link SessionAwareMessageListener} as well as to 567 * {@link org.springframework.jms.core.JmsTemplate} calls. 568 * <p>Default is "true", reusing the listener's {@link Session}. 569 * Turn this off to expose a fresh JMS Session fetched from the same 570 * underlying JMS {@link Connection} instead, which might be necessary 571 * on some JMS providers. 572 * <p>Note that Sessions managed by an external transaction manager will 573 * always get exposed to {@link org.springframework.jms.core.JmsTemplate} 574 * calls. So in terms of JmsTemplate exposure, this setting only affects 575 * locally transacted Sessions. 576 * @see SessionAwareMessageListener 577 */ 578 public void setExposeListenerSession(boolean exposeListenerSession) { 579 this.exposeListenerSession = exposeListenerSession; 580 } 581 582 /** 583 * Return whether to expose the listener JMS {@link Session} to a 584 * registered {@link SessionAwareMessageListener}. 585 */ 586 public boolean isExposeListenerSession() { 587 return this.exposeListenerSession; 588 } 589 590 /** 591 * Set whether to accept received messages while the listener container 592 * in the process of stopping. 593 * <p>Default is "false", rejecting such messages through aborting the 594 * receive attempt. Switch this flag on to fully process such messages 595 * even in the stopping phase, with the drawback that even newly sent 596 * messages might still get processed (if coming in before all receive 597 * timeouts have expired). 598 * <p><b>NOTE:</b> Aborting receive attempts for such incoming messages 599 * might lead to the provider's retry count decreasing for the affected 600 * messages. If you have a high number of concurrent consumers, make sure 601 * that the number of retries is higher than the number of consumers, 602 * to be on the safe side for all potential stopping scenarios. 603 */ 604 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 605 this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; 606 } 607 608 /** 609 * Return whether to accept received messages while the listener container 610 * in the process of stopping. 611 */ 612 public boolean isAcceptMessagesWhileStopping() { 613 return this.acceptMessagesWhileStopping; 614 } 615 616 @Override 617 protected void validateConfiguration() { 618 if (this.destination == null) { 619 throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required"); 620 } 621 } 622 623 @Override 624 public void setupMessageListener(Object messageListener) { 625 setMessageListener(messageListener); 626 } 627 628 629 //------------------------------------------------------------------------- 630 // Template methods for listener execution 631 //------------------------------------------------------------------------- 632 633 /** 634 * Execute the specified listener, 635 * committing or rolling back the transaction afterwards (if necessary). 636 * @param session the JMS Session to operate on 637 * @param message the received JMS Message 638 * @see #invokeListener 639 * @see #commitIfNecessary 640 * @see #rollbackOnExceptionIfNecessary 641 * @see #handleListenerException 642 */ 643 protected void executeListener(Session session, Message message) { 644 try { 645 doExecuteListener(session, message); 646 } 647 catch (Throwable ex) { 648 handleListenerException(ex); 649 } 650 } 651 652 /** 653 * Execute the specified listener, 654 * committing or rolling back the transaction afterwards (if necessary). 655 * @param session the JMS Session to operate on 656 * @param message the received JMS Message 657 * @throws JMSException if thrown by JMS API methods 658 * @see #invokeListener 659 * @see #commitIfNecessary 660 * @see #rollbackOnExceptionIfNecessary 661 * @see #convertJmsAccessException 662 */ 663 protected void doExecuteListener(Session session, Message message) throws JMSException { 664 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 665 if (logger.isWarnEnabled()) { 666 logger.warn("Rejecting received message because of the listener container " + 667 "having been stopped in the meantime: " + message); 668 } 669 rollbackIfNecessary(session); 670 throw new MessageRejectedWhileStoppingException(); 671 } 672 673 try { 674 invokeListener(session, message); 675 } 676 catch (JMSException | RuntimeException | Error ex) { 677 rollbackOnExceptionIfNecessary(session, ex); 678 throw ex; 679 } 680 commitIfNecessary(session, message); 681 } 682 683 /** 684 * Invoke the specified listener: either as standard JMS MessageListener 685 * or (preferably) as Spring SessionAwareMessageListener. 686 * @param session the JMS Session to operate on 687 * @param message the received JMS Message 688 * @throws JMSException if thrown by JMS API methods 689 * @see #setMessageListener 690 */ 691 @SuppressWarnings("rawtypes") 692 protected void invokeListener(Session session, Message message) throws JMSException { 693 Object listener = getMessageListener(); 694 695 if (listener instanceof SessionAwareMessageListener) { 696 doInvokeListener((SessionAwareMessageListener) listener, session, message); 697 } 698 else if (listener instanceof MessageListener) { 699 doInvokeListener((MessageListener) listener, message); 700 } 701 else if (listener != null) { 702 throw new IllegalArgumentException( 703 "Only MessageListener and SessionAwareMessageListener supported: " + listener); 704 } 705 else { 706 throw new IllegalStateException("No message listener specified - see property 'messageListener'"); 707 } 708 } 709 710 /** 711 * Invoke the specified listener as Spring SessionAwareMessageListener, 712 * exposing a new JMS Session (potentially with its own transaction) 713 * to the listener if demanded. 714 * @param listener the Spring SessionAwareMessageListener to invoke 715 * @param session the JMS Session to operate on 716 * @param message the received JMS Message 717 * @throws JMSException if thrown by JMS API methods 718 * @see SessionAwareMessageListener 719 * @see #setExposeListenerSession 720 */ 721 @SuppressWarnings({"rawtypes", "unchecked"}) 722 protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message) 723 throws JMSException { 724 725 Connection conToClose = null; 726 Session sessionToClose = null; 727 try { 728 Session sessionToUse = session; 729 if (!isExposeListenerSession()) { 730 // We need to expose a separate Session. 731 conToClose = createConnection(); 732 sessionToClose = createSession(conToClose); 733 sessionToUse = sessionToClose; 734 } 735 // Actually invoke the message listener... 736 listener.onMessage(message, sessionToUse); 737 // Clean up specially exposed Session, if any. 738 if (sessionToUse != session) { 739 if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) { 740 // Transacted session created by this container -> commit. 741 JmsUtils.commitIfNecessary(sessionToUse); 742 } 743 } 744 } 745 finally { 746 JmsUtils.closeSession(sessionToClose); 747 JmsUtils.closeConnection(conToClose); 748 } 749 } 750 751 /** 752 * Invoke the specified listener as standard JMS MessageListener. 753 * <p>Default implementation performs a plain invocation of the 754 * {@code onMessage} method. 755 * @param listener the JMS MessageListener to invoke 756 * @param message the received JMS Message 757 * @throws JMSException if thrown by JMS API methods 758 * @see javax.jms.MessageListener#onMessage 759 */ 760 protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { 761 listener.onMessage(message); 762 } 763 764 /** 765 * Perform a commit or message acknowledgement, as appropriate. 766 * @param session the JMS Session to commit 767 * @param message the Message to acknowledge 768 * @throws javax.jms.JMSException in case of commit failure 769 */ 770 protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException { 771 // Commit session or acknowledge message. 772 if (session.getTransacted()) { 773 // Commit necessary - but avoid commit call within a JTA transaction. 774 if (isSessionLocallyTransacted(session)) { 775 // Transacted session created by this container -> commit. 776 JmsUtils.commitIfNecessary(session); 777 } 778 } 779 else if (message != null && isClientAcknowledge(session)) { 780 message.acknowledge(); 781 } 782 } 783 784 /** 785 * Perform a rollback, if appropriate. 786 * @param session the JMS Session to rollback 787 * @throws javax.jms.JMSException in case of a rollback error 788 */ 789 protected void rollbackIfNecessary(Session session) throws JMSException { 790 if (session.getTransacted()) { 791 if (isSessionLocallyTransacted(session)) { 792 // Transacted session created by this container -> rollback. 793 JmsUtils.rollbackIfNecessary(session); 794 } 795 } 796 else if (isClientAcknowledge(session)) { 797 session.recover(); 798 } 799 } 800 801 /** 802 * Perform a rollback, handling rollback exceptions properly. 803 * @param session the JMS Session to rollback 804 * @param ex the thrown application exception or error 805 * @throws javax.jms.JMSException in case of a rollback error 806 */ 807 protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException { 808 try { 809 if (session.getTransacted()) { 810 if (isSessionLocallyTransacted(session)) { 811 // Transacted session created by this container -> rollback. 812 if (logger.isDebugEnabled()) { 813 logger.debug("Initiating transaction rollback on application exception", ex); 814 } 815 JmsUtils.rollbackIfNecessary(session); 816 } 817 } 818 else if (isClientAcknowledge(session)) { 819 session.recover(); 820 } 821 } 822 catch (IllegalStateException ex2) { 823 logger.debug("Could not roll back because Session already closed", ex2); 824 } 825 catch (JMSException | RuntimeException | Error ex2) { 826 logger.error("Application exception overridden by rollback error", ex); 827 throw ex2; 828 } 829 } 830 831 /** 832 * Check whether the given Session is locally transacted, that is, whether 833 * its transaction is managed by this listener container's Session handling 834 * and not by an external transaction coordinator. 835 * <p>Note: The Session's own transacted flag will already have been checked 836 * before. This method is about finding out whether the Session's transaction 837 * is local or externally coordinated. 838 * @param session the Session to check 839 * @return whether the given Session is locally transacted 840 * @see #isSessionTransacted() 841 * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional 842 */ 843 protected boolean isSessionLocallyTransacted(Session session) { 844 return isSessionTransacted(); 845 } 846 847 /** 848 * Create a JMS MessageConsumer for the given Session and Destination. 849 * <p>This implementation uses JMS 1.1 API. 850 * @param session the JMS Session to create a MessageConsumer for 851 * @param destination the JMS Destination to create a MessageConsumer for 852 * @return the new JMS MessageConsumer 853 * @throws javax.jms.JMSException if thrown by JMS API methods 854 */ 855 protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { 856 if (isPubSubDomain() && destination instanceof Topic) { 857 if (isSubscriptionShared()) { 858 return (isSubscriptionDurable() ? 859 session.createSharedDurableConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()) : 860 session.createSharedConsumer((Topic) destination, getSubscriptionName(), getMessageSelector())); 861 } 862 else if (isSubscriptionDurable()) { 863 return session.createDurableSubscriber( 864 (Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); 865 } 866 else { 867 // Only pass in the NoLocal flag in case of a Topic (pub-sub mode): 868 // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException 869 // in case of the NoLocal flag being specified for a Queue. 870 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); 871 } 872 } 873 else { 874 return session.createConsumer(destination, getMessageSelector()); 875 } 876 } 877 878 /** 879 * Handle the given exception that arose during listener execution. 880 * <p>The default implementation logs the exception at warn level, 881 * not propagating it to the JMS provider — assuming that all handling of 882 * acknowledgement and/or transactions is done by this listener container. 883 * This can be overridden in subclasses. 884 * @param ex the exception to handle 885 */ 886 protected void handleListenerException(Throwable ex) { 887 if (ex instanceof MessageRejectedWhileStoppingException) { 888 // Internal exception - has been handled before. 889 return; 890 } 891 if (ex instanceof JMSException) { 892 invokeExceptionListener((JMSException) ex); 893 } 894 if (isActive()) { 895 // Regular case: failed while active. 896 // Invoke ErrorHandler if available. 897 invokeErrorHandler(ex); 898 } 899 else { 900 // Rare case: listener thread failed after container shutdown. 901 // Log at debug level, to avoid spamming the shutdown log. 902 logger.debug("Listener exception after container shutdown", ex); 903 } 904 } 905 906 /** 907 * Invoke the registered JMS ExceptionListener, if any. 908 * @param ex the exception that arose during JMS processing 909 * @see #setExceptionListener 910 */ 911 protected void invokeExceptionListener(JMSException ex) { 912 ExceptionListener exceptionListener = getExceptionListener(); 913 if (exceptionListener != null) { 914 exceptionListener.onException(ex); 915 } 916 } 917 918 /** 919 * Invoke the registered ErrorHandler, if any. Log at warn level otherwise. 920 * @param ex the uncaught error that arose during JMS processing. 921 * @see #setErrorHandler 922 */ 923 protected void invokeErrorHandler(Throwable ex) { 924 ErrorHandler errorHandler = getErrorHandler(); 925 if (errorHandler != null) { 926 errorHandler.handleError(ex); 927 } 928 else { 929 logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex); 930 } 931 } 932 933 934 /** 935 * Internal exception class that indicates a rejected message on shutdown. 936 * Used to trigger a rollback for an external transaction manager in that case. 937 */ 938 @SuppressWarnings("serial") 939 private static class MessageRejectedWhileStoppingException extends RuntimeException { 940 } 941 942}