001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.lang.reflect.InvocationTargetException; 020import java.lang.reflect.Method; 021import javax.jms.Connection; 022import javax.jms.Destination; 023import javax.jms.ExceptionListener; 024import javax.jms.JMSException; 025import javax.jms.Message; 026import javax.jms.MessageConsumer; 027import javax.jms.MessageListener; 028import javax.jms.Queue; 029import javax.jms.Session; 030import javax.jms.Topic; 031 032import org.springframework.jms.support.JmsUtils; 033import org.springframework.jms.support.converter.MessageConverter; 034import org.springframework.util.Assert; 035import org.springframework.util.ClassUtils; 036import org.springframework.util.ErrorHandler; 037import org.springframework.util.ReflectionUtils; 038 039/** 040 * Abstract base class for Spring message listener container implementations. 041 * Can either host a standard JMS {@link javax.jms.MessageListener} or Spring's 042 * {@link SessionAwareMessageListener} for actual message processing. 043 * 044 * <p>Usually holds a single JMS {@link Connection} that all listeners are supposed 045 * to be registered on, which is the standard JMS way of managing listener sessions. 046 * Can alternatively also be used with a fresh Connection per listener, for Java EE 047 * style XA-aware JMS messaging. The actual registration process is up to concrete 048 * subclasses. 049 * 050 * <p><b>NOTE:</b> The default behavior of this message listener container is to 051 * <b>never</b> propagate an exception thrown by a message listener up to the JMS 052 * provider. Instead, it will log any such exception at the error level. 053 * This means that from the perspective of the attendant JMS provider no such 054 * listener will ever fail. However, if error handling is necessary, then 055 * any implementation of the {@link ErrorHandler} strategy may be provided to 056 * the {@link #setErrorHandler(ErrorHandler)} method. Note that JMSExceptions 057 * <b>will</b> be passed to the ErrorHandler in addition to (but after) being 058 * passed to an {@link ExceptionListener}, if one has been provided. 059 * 060 * <p>The listener container offers the following message acknowledgment options: 061 * <ul> 062 * <li>"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): 063 * This mode is container-dependent: For {@link DefaultMessageListenerContainer}, 064 * it means automatic message acknowledgment <i>before</i> listener execution, with 065 * no redelivery in case of an exception and no redelivery in case of other listener 066 * execution interruptions either. For {@link SimpleMessageListenerContainer}, 067 * it means automatic message acknowledgment <i>after</i> listener execution, with 068 * no redelivery in case of a user exception thrown but potential redelivery in case 069 * of the JVM dying during listener execution. In order to consistently arrange for 070 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - 071 * preferably - setting "sessionTransacted" to "true" instead. 072 * <li>"sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": 073 * <i>Lazy</i> message acknowledgment during ({@link DefaultMessageListenerContainer}) 074 * or shortly after ({@link SimpleMessageListenerContainer}) listener execution; 075 * no redelivery in case of a user exception thrown but potential redelivery in case 076 * of the JVM dying during listener execution. In order to consistently arrange for 077 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or - 078 * preferably - setting "sessionTransacted" to "true" instead. 079 * <li>"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": 080 * Automatic message acknowledgment <i>after</i> successful listener execution; 081 * best-effort redelivery in case of a user exception thrown as well as in case 082 * of other listener execution interruptions (such as the JVM dying). 083 * <li>"sessionTransacted" set to "true": 084 * Transactional acknowledgment after successful listener execution; 085 * <i>guaranteed redelivery</i> in case of a user exception thrown as well as 086 * in case of other listener execution interruptions (such as the JVM dying). 087 * </ul> 088 * 089 * <p>There are two solutions to the duplicate message processing problem: 090 * <ul> 091 * <li>Either add <i>duplicate message detection</i> to your listener, in the 092 * form of a business entity existence check or a protocol table check. This 093 * usually just needs to be done in case of the JMSRedelivered flag being 094 * set on the incoming message (otherwise just process straightforwardly). 095 * Note that with "sessionTransacted" set to "true", duplicate messages will 096 * only appear in case of the JVM dying at the most unfortunate point possible 097 * (i.e. after your business logic executed but before the JMS part got committed), 098 * so duplicate message detection is just there to cover a corner case. 099 * <li>Or wrap your <i>entire processing with an XA transaction</i>, covering the 100 * reception of the JMS message as well as the execution of the business logic in 101 * your message listener (including database operations etc). This is only 102 * supported by {@link DefaultMessageListenerContainer}, through specifying 103 * an external "transactionManager" (typically a 104 * {@link org.springframework.transaction.jta.JtaTransactionManager}, with 105 * a corresponding XA-aware JMS {@link javax.jms.ConnectionFactory} passed in 106 * as "connectionFactory"). 107 * </ul> 108 * Note that XA transaction coordination adds significant runtime overhead, 109 * so it might be feasible to avoid it unless absolutely necessary. 110 * 111 * <p><b>Recommendations:</b> 112 * <ul> 113 * <li>The general recommendation is to set "sessionTransacted" to "true", 114 * typically in combination with local database transactions triggered by the 115 * listener implementation, through Spring's standard transaction facilities. 116 * This will work nicely in Tomcat or in a standalone environment, often 117 * combined with custom duplicate message detection (if it is unacceptable 118 * to ever process the same message twice). 119 * <li>Alternatively, specify a 120 * {@link org.springframework.transaction.jta.JtaTransactionManager} as 121 * "transactionManager" for a fully XA-aware JMS provider - typically when 122 * running on a Java EE server, but also for other environments with a JTA 123 * transaction manager present. This will give full "exactly-once" guarantees 124 * without custom duplicate message checks, at the price of additional 125 * runtime processing overhead. 126 * </ul> 127 * 128 * <p>Note that the "sessionTransacted" flag is strongly recommended over 129 * {@link org.springframework.jms.connection.JmsTransactionManager}, provided 130 * that transactions do not need to be managed externally. As a consequence, 131 * set the transaction manager only if you are using JTA or if you need to 132 * synchronize with custom external transaction arrangements. 133 * 134 * @author Juergen Hoeller 135 * @author Stephane Nicoll 136 * @since 2.0 137 * @see #setMessageListener 138 * @see javax.jms.MessageListener 139 * @see SessionAwareMessageListener 140 * @see #handleListenerException 141 * @see DefaultMessageListenerContainer 142 * @see SimpleMessageListenerContainer 143 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager 144 */ 145public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer 146 implements MessageListenerContainer { 147 148 /** The JMS 2.0 Session.createSharedConsumer method, if available */ 149 private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable( 150 Session.class, "createSharedConsumer", Topic.class, String.class, String.class); 151 152 /** The JMS 2.0 Session.createSharedDurableConsumer method, if available */ 153 private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable( 154 Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class); 155 156 157 private volatile Object destination; 158 159 private volatile String messageSelector; 160 161 private volatile Object messageListener; 162 163 private boolean subscriptionDurable = false; 164 165 private boolean subscriptionShared = false; 166 167 private String subscriptionName; 168 169 private Boolean replyPubSubDomain; 170 171 private boolean pubSubNoLocal = false; 172 173 private MessageConverter messageConverter; 174 175 private ExceptionListener exceptionListener; 176 177 private ErrorHandler errorHandler; 178 179 private boolean exposeListenerSession = true; 180 181 private boolean acceptMessagesWhileStopping = false; 182 183 184 /** 185 * Specify concurrency limits. 186 */ 187 public abstract void setConcurrency(String concurrency); 188 189 /** 190 * Set the destination to receive messages from. 191 * <p>Alternatively, specify a "destinationName", to be dynamically 192 * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}. 193 * <p>Note: The destination may be replaced at runtime, with the listener 194 * container picking up the new destination immediately (works e.g. with 195 * DefaultMessageListenerContainer, as long as the cache level is less than 196 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 197 * @see #setDestinationName(String) 198 */ 199 public void setDestination(Destination destination) { 200 this.destination = destination; 201 if (destination instanceof Topic && !(destination instanceof Queue)) { 202 // Clearly a Topic: let's set the "pubSubDomain" flag accordingly. 203 setPubSubDomain(true); 204 } 205 } 206 207 /** 208 * Return the destination to receive messages from. Will be {@code null} 209 * if the configured destination is not an actual {@link Destination} type; 210 * c.f. {@link #setDestinationName(String) when the destination is a String}. 211 */ 212 public Destination getDestination() { 213 return (this.destination instanceof Destination ? (Destination) this.destination : null); 214 } 215 216 /** 217 * Set the name of the destination to receive messages from. 218 * <p>The specified name will be dynamically resolved via the configured 219 * {@link #setDestinationResolver destination resolver}. 220 * <p>Alternatively, specify a JMS {@link Destination} object as "destination". 221 * <p>Note: The destination may be replaced at runtime, with the listener 222 * container picking up the new destination immediately (works e.g. with 223 * DefaultMessageListenerContainer, as long as the cache level is less than 224 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 225 * @see #setDestination(javax.jms.Destination) 226 */ 227 public void setDestinationName(String destinationName) { 228 this.destination = destinationName; 229 } 230 231 /** 232 * Return the name of the destination to receive messages from. 233 * Will be {@code null} if the configured destination is not a 234 * {@link String} type; c.f. {@link #setDestination(Destination) when 235 * it is an actual Destination}. 236 */ 237 public String getDestinationName() { 238 return (this.destination instanceof String ? (String) this.destination : null); 239 } 240 241 /** 242 * Return a descriptive String for this container's JMS destination 243 * (never {@code null}). 244 */ 245 protected String getDestinationDescription() { 246 Object destination = this.destination; 247 return (destination != null ? destination.toString() : ""); 248 } 249 250 /** 251 * Set the JMS message selector expression (or {@code null} if none). 252 * Default is none. 253 * <p>See the JMS specification for a detailed definition of selector expressions. 254 * <p>Note: The message selector may be replaced at runtime, with the listener 255 * container picking up the new selector value immediately (works e.g. with 256 * DefaultMessageListenerContainer, as long as the cache level is less than 257 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 258 */ 259 public void setMessageSelector(String messageSelector) { 260 this.messageSelector = messageSelector; 261 } 262 263 /** 264 * Return the JMS message selector expression (or {@code null} if none). 265 */ 266 public String getMessageSelector() { 267 return this.messageSelector; 268 } 269 270 271 /** 272 * Set the message listener implementation to register. 273 * This can be either a standard JMS {@link MessageListener} object 274 * or a Spring {@link SessionAwareMessageListener} object. 275 * <p>Note: The message listener may be replaced at runtime, with the listener 276 * container picking up the new listener object immediately (works e.g. with 277 * DefaultMessageListenerContainer, as long as the cache level is less than 278 * CACHE_CONSUMER). However, this is considered advanced usage; use it with care! 279 * @throws IllegalArgumentException if the supplied listener is not a 280 * {@link MessageListener} or a {@link SessionAwareMessageListener} 281 * @see javax.jms.MessageListener 282 * @see SessionAwareMessageListener 283 */ 284 public void setMessageListener(Object messageListener) { 285 checkMessageListener(messageListener); 286 this.messageListener = messageListener; 287 if (this.subscriptionName == null) { 288 this.subscriptionName = getDefaultSubscriptionName(messageListener); 289 } 290 } 291 292 /** 293 * Return the message listener object to register. 294 */ 295 public Object getMessageListener() { 296 return this.messageListener; 297 } 298 299 /** 300 * Check the given message listener, throwing an exception 301 * if it does not correspond to a supported listener type. 302 * <p>By default, only a standard JMS {@link MessageListener} object or a 303 * Spring {@link SessionAwareMessageListener} object will be accepted. 304 * @param messageListener the message listener object to check 305 * @throws IllegalArgumentException if the supplied listener is not a 306 * {@link MessageListener} or a {@link SessionAwareMessageListener} 307 * @see javax.jms.MessageListener 308 * @see SessionAwareMessageListener 309 */ 310 protected void checkMessageListener(Object messageListener) { 311 if (!(messageListener instanceof MessageListener || 312 messageListener instanceof SessionAwareMessageListener)) { 313 throw new IllegalArgumentException( 314 "Message listener needs to be of type [" + MessageListener.class.getName() + 315 "] or [" + SessionAwareMessageListener.class.getName() + "]"); 316 } 317 } 318 319 /** 320 * Determine the default subscription name for the given message listener. 321 * @param messageListener the message listener object to check 322 * @return the default subscription name 323 * @see SubscriptionNameProvider 324 */ 325 protected String getDefaultSubscriptionName(Object messageListener) { 326 if (messageListener instanceof SubscriptionNameProvider) { 327 return ((SubscriptionNameProvider) messageListener).getSubscriptionName(); 328 } 329 else { 330 return messageListener.getClass().getName(); 331 } 332 } 333 334 /** 335 * Set whether to make the subscription durable. The durable subscription name 336 * to be used can be specified through the "subscriptionName" property. 337 * <p>Default is "false". Set this to "true" to register a durable subscription, 338 * typically in combination with a "subscriptionName" value (unless 339 * your message listener class name is good enough as subscription name). 340 * <p>Only makes sense when listening to a topic (pub-sub domain), 341 * therefore this method switches the "pubSubDomain" flag as well. 342 * @see #setSubscriptionName 343 * @see #setPubSubDomain 344 */ 345 public void setSubscriptionDurable(boolean subscriptionDurable) { 346 this.subscriptionDurable = subscriptionDurable; 347 if (subscriptionDurable) { 348 setPubSubDomain(true); 349 } 350 } 351 352 /** 353 * Return whether to make the subscription durable. 354 */ 355 public boolean isSubscriptionDurable() { 356 return this.subscriptionDurable; 357 } 358 359 /** 360 * Set whether to make the subscription shared. The shared subscription name 361 * to be used can be specified through the "subscriptionName" property. 362 * <p>Default is "false". Set this to "true" to register a shared subscription, 363 * typically in combination with a "subscriptionName" value (unless 364 * your message listener class name is good enough as subscription name). 365 * Note that shared subscriptions may also be durable, so this flag can 366 * (and often will) be combined with "subscriptionDurable" as well. 367 * <p>Only makes sense when listening to a topic (pub-sub domain), 368 * therefore this method switches the "pubSubDomain" flag as well. 369 * <p><b>Requires a JMS 2.0 compatible message broker.</b> 370 * @since 4.1 371 * @see #setSubscriptionName 372 * @see #setSubscriptionDurable 373 * @see #setPubSubDomain 374 */ 375 public void setSubscriptionShared(boolean subscriptionShared) { 376 this.subscriptionShared = subscriptionShared; 377 if (subscriptionShared) { 378 setPubSubDomain(true); 379 } 380 } 381 382 /** 383 * Return whether to make the subscription shared. 384 * @since 4.1 385 */ 386 public boolean isSubscriptionShared() { 387 return this.subscriptionShared; 388 } 389 390 /** 391 * Set the name of a subscription to create. To be applied in case 392 * of a topic (pub-sub domain) with a shared or durable subscription. 393 * <p>The subscription name needs to be unique within this client's 394 * JMS client id. Default is the class name of the specified message listener. 395 * <p>Note: Only 1 concurrent consumer (which is the default of this 396 * message listener container) is allowed for each subscription, 397 * except for a shared subscription (which requires JMS 2.0). 398 * @since 4.1 399 * @see #setPubSubDomain 400 * @see #setSubscriptionDurable 401 * @see #setSubscriptionShared 402 * @see #setClientId 403 * @see #setMessageListener 404 */ 405 public void setSubscriptionName(String subscriptionName) { 406 this.subscriptionName = subscriptionName; 407 } 408 409 /** 410 * Return the name of a subscription to create, if any. 411 * @since 4.1 412 */ 413 public String getSubscriptionName() { 414 return this.subscriptionName; 415 } 416 417 /** 418 * Set the name of a durable subscription to create. This method switches 419 * to pub-sub domain mode and activates subscription durability as well. 420 * <p>The durable subscription name needs to be unique within this client's 421 * JMS client id. Default is the class name of the specified message listener. 422 * <p>Note: Only 1 concurrent consumer (which is the default of this 423 * message listener container) is allowed for each durable subscription, 424 * except for a shared durable subscription (which requires JMS 2.0). 425 * @see #setPubSubDomain 426 * @see #setSubscriptionDurable 427 * @see #setSubscriptionShared 428 * @see #setClientId 429 * @see #setMessageListener 430 */ 431 public void setDurableSubscriptionName(String durableSubscriptionName) { 432 this.subscriptionName = durableSubscriptionName; 433 this.subscriptionDurable = (durableSubscriptionName != null); 434 } 435 436 /** 437 * Return the name of a durable subscription to create, if any. 438 */ 439 public String getDurableSubscriptionName() { 440 return (this.subscriptionDurable ? this.subscriptionName : null); 441 } 442 443 /** 444 * Set whether to inhibit the delivery of messages published by its own connection. 445 * Default is "false". 446 * @since 4.1 447 * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean) 448 */ 449 public void setPubSubNoLocal(boolean pubSubNoLocal) { 450 this.pubSubNoLocal = pubSubNoLocal; 451 } 452 453 /** 454 * Return whether to inhibit the delivery of messages published by its own connection. 455 * @since 4.1 456 */ 457 public boolean isPubSubNoLocal() { 458 return this.pubSubNoLocal; 459 } 460 461 /** 462 * Configure the reply destination type. By default, the configured {@code pubSubDomain} 463 * value is used (see {@link #isPubSubDomain()}. 464 * <p>This setting primarily indicates what type of destination to resolve if dynamic 465 * destinations are enabled. 466 * @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link Topic Topics}), 467 * "false" for the Point-to-Point domain ({@link Queue Queues}) 468 * @since 4.2 469 * @see #setDestinationResolver 470 */ 471 public void setReplyPubSubDomain(boolean replyPubSubDomain) { 472 this.replyPubSubDomain = replyPubSubDomain; 473 } 474 475 /** 476 * Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used 477 * for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues}) 478 * is used. 479 * @since 4.2 480 */ 481 @Override 482 public boolean isReplyPubSubDomain() { 483 if (this.replyPubSubDomain != null) { 484 return replyPubSubDomain; 485 } 486 else { 487 return isPubSubDomain(); 488 } 489 } 490 491 /** 492 * Set the {@link MessageConverter} strategy for converting JMS Messages. 493 * @since 4.1 494 */ 495 public void setMessageConverter(MessageConverter messageConverter) { 496 this.messageConverter = messageConverter; 497 } 498 499 @Override 500 public MessageConverter getMessageConverter() { 501 return this.messageConverter; 502 } 503 504 /** 505 * Set the JMS ExceptionListener to notify in case of a JMSException thrown 506 * by the registered message listener or the invocation infrastructure. 507 */ 508 public void setExceptionListener(ExceptionListener exceptionListener) { 509 this.exceptionListener = exceptionListener; 510 } 511 512 /** 513 * Return the JMS ExceptionListener to notify in case of a JMSException thrown 514 * by the registered message listener or the invocation infrastructure, if any. 515 */ 516 public ExceptionListener getExceptionListener() { 517 return this.exceptionListener; 518 } 519 520 /** 521 * Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown 522 * while processing a Message. 523 * <p>By default, there will be <b>no</b> ErrorHandler so that error-level 524 * logging is the only result. 525 */ 526 public void setErrorHandler(ErrorHandler errorHandler) { 527 this.errorHandler = errorHandler; 528 } 529 530 /** 531 * Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown 532 * while processing a Message. 533 * @since 4.1 534 */ 535 public ErrorHandler getErrorHandler() { 536 return this.errorHandler; 537 } 538 539 /** 540 * Set whether to expose the listener JMS Session to a registered 541 * {@link SessionAwareMessageListener} as well as to 542 * {@link org.springframework.jms.core.JmsTemplate} calls. 543 * <p>Default is "true", reusing the listener's {@link Session}. 544 * Turn this off to expose a fresh JMS Session fetched from the same 545 * underlying JMS {@link Connection} instead, which might be necessary 546 * on some JMS providers. 547 * <p>Note that Sessions managed by an external transaction manager will 548 * always get exposed to {@link org.springframework.jms.core.JmsTemplate} 549 * calls. So in terms of JmsTemplate exposure, this setting only affects 550 * locally transacted Sessions. 551 * @see SessionAwareMessageListener 552 */ 553 public void setExposeListenerSession(boolean exposeListenerSession) { 554 this.exposeListenerSession = exposeListenerSession; 555 } 556 557 /** 558 * Return whether to expose the listener JMS {@link Session} to a 559 * registered {@link SessionAwareMessageListener}. 560 */ 561 public boolean isExposeListenerSession() { 562 return this.exposeListenerSession; 563 } 564 565 /** 566 * Set whether to accept received messages while the listener container 567 * in the process of stopping. 568 * <p>Default is "false", rejecting such messages through aborting the 569 * receive attempt. Switch this flag on to fully process such messages 570 * even in the stopping phase, with the drawback that even newly sent 571 * messages might still get processed (if coming in before all receive 572 * timeouts have expired). 573 * <p><b>NOTE:</b> Aborting receive attempts for such incoming messages 574 * might lead to the provider's retry count decreasing for the affected 575 * messages. If you have a high number of concurrent consumers, make sure 576 * that the number of retries is higher than the number of consumers, 577 * to be on the safe side for all potential stopping scenarios. 578 */ 579 public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) { 580 this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; 581 } 582 583 /** 584 * Return whether to accept received messages while the listener container 585 * in the process of stopping. 586 */ 587 public boolean isAcceptMessagesWhileStopping() { 588 return this.acceptMessagesWhileStopping; 589 } 590 591 @Override 592 protected void validateConfiguration() { 593 if (this.destination == null) { 594 throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required"); 595 } 596 } 597 598 @Override 599 public void setupMessageListener(Object messageListener) { 600 setMessageListener(messageListener); 601 } 602 603 604 //------------------------------------------------------------------------- 605 // Template methods for listener execution 606 //------------------------------------------------------------------------- 607 608 /** 609 * Execute the specified listener, 610 * committing or rolling back the transaction afterwards (if necessary). 611 * @param session the JMS Session to operate on 612 * @param message the received JMS Message 613 * @see #invokeListener 614 * @see #commitIfNecessary 615 * @see #rollbackOnExceptionIfNecessary 616 * @see #handleListenerException 617 */ 618 protected void executeListener(Session session, Message message) { 619 try { 620 doExecuteListener(session, message); 621 } 622 catch (Throwable ex) { 623 handleListenerException(ex); 624 } 625 } 626 627 /** 628 * Execute the specified listener, 629 * committing or rolling back the transaction afterwards (if necessary). 630 * @param session the JMS Session to operate on 631 * @param message the received JMS Message 632 * @throws JMSException if thrown by JMS API methods 633 * @see #invokeListener 634 * @see #commitIfNecessary 635 * @see #rollbackOnExceptionIfNecessary 636 * @see #convertJmsAccessException 637 */ 638 protected void doExecuteListener(Session session, Message message) throws JMSException { 639 if (!isAcceptMessagesWhileStopping() && !isRunning()) { 640 if (logger.isWarnEnabled()) { 641 logger.warn("Rejecting received message because of the listener container " + 642 "having been stopped in the meantime: " + message); 643 } 644 rollbackIfNecessary(session); 645 throw new MessageRejectedWhileStoppingException(); 646 } 647 648 try { 649 invokeListener(session, message); 650 } 651 catch (JMSException ex) { 652 rollbackOnExceptionIfNecessary(session, ex); 653 throw ex; 654 } 655 catch (RuntimeException ex) { 656 rollbackOnExceptionIfNecessary(session, ex); 657 throw ex; 658 } 659 catch (Error err) { 660 rollbackOnExceptionIfNecessary(session, err); 661 throw err; 662 } 663 commitIfNecessary(session, message); 664 } 665 666 /** 667 * Invoke the specified listener: either as standard JMS MessageListener 668 * or (preferably) as Spring SessionAwareMessageListener. 669 * @param session the JMS Session to operate on 670 * @param message the received JMS Message 671 * @throws JMSException if thrown by JMS API methods 672 * @see #setMessageListener 673 */ 674 @SuppressWarnings("rawtypes") 675 protected void invokeListener(Session session, Message message) throws JMSException { 676 Object listener = getMessageListener(); 677 678 if (listener instanceof SessionAwareMessageListener) { 679 doInvokeListener((SessionAwareMessageListener) listener, session, message); 680 } 681 else if (listener instanceof MessageListener) { 682 doInvokeListener((MessageListener) listener, message); 683 } 684 else if (listener != null) { 685 throw new IllegalArgumentException( 686 "Only MessageListener and SessionAwareMessageListener supported: " + listener); 687 } 688 else { 689 throw new IllegalStateException("No message listener specified - see property 'messageListener'"); 690 } 691 } 692 693 /** 694 * Invoke the specified listener as Spring SessionAwareMessageListener, 695 * exposing a new JMS Session (potentially with its own transaction) 696 * to the listener if demanded. 697 * @param listener the Spring SessionAwareMessageListener to invoke 698 * @param session the JMS Session to operate on 699 * @param message the received JMS Message 700 * @throws JMSException if thrown by JMS API methods 701 * @see SessionAwareMessageListener 702 * @see #setExposeListenerSession 703 */ 704 @SuppressWarnings({ "unchecked", "rawtypes" }) 705 protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message) 706 throws JMSException { 707 708 Connection conToClose = null; 709 Session sessionToClose = null; 710 try { 711 Session sessionToUse = session; 712 if (!isExposeListenerSession()) { 713 // We need to expose a separate Session. 714 conToClose = createConnection(); 715 sessionToClose = createSession(conToClose); 716 sessionToUse = sessionToClose; 717 } 718 // Actually invoke the message listener... 719 listener.onMessage(message, sessionToUse); 720 // Clean up specially exposed Session, if any. 721 if (sessionToUse != session) { 722 if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) { 723 // Transacted session created by this container -> commit. 724 JmsUtils.commitIfNecessary(sessionToUse); 725 } 726 } 727 } 728 finally { 729 JmsUtils.closeSession(sessionToClose); 730 JmsUtils.closeConnection(conToClose); 731 } 732 } 733 734 /** 735 * Invoke the specified listener as standard JMS MessageListener. 736 * <p>Default implementation performs a plain invocation of the 737 * {@code onMessage} method. 738 * @param listener the JMS MessageListener to invoke 739 * @param message the received JMS Message 740 * @throws JMSException if thrown by JMS API methods 741 * @see javax.jms.MessageListener#onMessage 742 */ 743 protected void doInvokeListener(MessageListener listener, Message message) throws JMSException { 744 listener.onMessage(message); 745 } 746 747 /** 748 * Perform a commit or message acknowledgement, as appropriate. 749 * @param session the JMS Session to commit 750 * @param message the Message to acknowledge 751 * @throws javax.jms.JMSException in case of commit failure 752 */ 753 protected void commitIfNecessary(Session session, Message message) throws JMSException { 754 // Commit session or acknowledge message. 755 if (session.getTransacted()) { 756 // Commit necessary - but avoid commit call within a JTA transaction. 757 if (isSessionLocallyTransacted(session)) { 758 // Transacted session created by this container -> commit. 759 JmsUtils.commitIfNecessary(session); 760 } 761 } 762 else if (message != null && isClientAcknowledge(session)) { 763 message.acknowledge(); 764 } 765 } 766 767 /** 768 * Perform a rollback, if appropriate. 769 * @param session the JMS Session to rollback 770 * @throws javax.jms.JMSException in case of a rollback error 771 */ 772 protected void rollbackIfNecessary(Session session) throws JMSException { 773 if (session.getTransacted()) { 774 if (isSessionLocallyTransacted(session)) { 775 // Transacted session created by this container -> rollback. 776 JmsUtils.rollbackIfNecessary(session); 777 } 778 } 779 else if (isClientAcknowledge(session)) { 780 session.recover(); 781 } 782 } 783 784 /** 785 * Perform a rollback, handling rollback exceptions properly. 786 * @param session the JMS Session to rollback 787 * @param ex the thrown application exception or error 788 * @throws javax.jms.JMSException in case of a rollback error 789 */ 790 protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException { 791 try { 792 if (session.getTransacted()) { 793 if (isSessionLocallyTransacted(session)) { 794 // Transacted session created by this container -> rollback. 795 if (logger.isDebugEnabled()) { 796 logger.debug("Initiating transaction rollback on application exception", ex); 797 } 798 JmsUtils.rollbackIfNecessary(session); 799 } 800 } 801 else if (isClientAcknowledge(session)) { 802 session.recover(); 803 } 804 } 805 catch (IllegalStateException ex2) { 806 logger.debug("Could not roll back because Session already closed", ex2); 807 } 808 catch (JMSException ex2) { 809 logger.error("Application exception overridden by rollback exception", ex); 810 throw ex2; 811 } 812 catch (RuntimeException ex2) { 813 logger.error("Application exception overridden by rollback exception", ex); 814 throw ex2; 815 } 816 catch (Error err) { 817 logger.error("Application exception overridden by rollback error", ex); 818 throw err; 819 } 820 } 821 822 /** 823 * Check whether the given Session is locally transacted, that is, whether 824 * its transaction is managed by this listener container's Session handling 825 * and not by an external transaction coordinator. 826 * <p>Note: The Session's own transacted flag will already have been checked 827 * before. This method is about finding out whether the Session's transaction 828 * is local or externally coordinated. 829 * @param session the Session to check 830 * @return whether the given Session is locally transacted 831 * @see #isSessionTransacted() 832 * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional 833 */ 834 protected boolean isSessionLocallyTransacted(Session session) { 835 return isSessionTransacted(); 836 } 837 838 /** 839 * Create a JMS MessageConsumer for the given Session and Destination. 840 * <p>This implementation uses JMS 1.1 API. 841 * @param session the JMS Session to create a MessageConsumer for 842 * @param destination the JMS Destination to create a MessageConsumer for 843 * @return the new JMS MessageConsumer 844 * @throws javax.jms.JMSException if thrown by JMS API methods 845 */ 846 protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException { 847 if (isPubSubDomain() && destination instanceof Topic) { 848 if (isSubscriptionShared()) { 849 // createSharedConsumer((Topic) dest, subscription, selector); 850 // createSharedDurableConsumer((Topic) dest, subscription, selector); 851 Method method = (isSubscriptionDurable() ? 852 createSharedDurableConsumerMethod : createSharedConsumerMethod); 853 try { 854 return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector()); 855 } 856 catch (InvocationTargetException ex) { 857 if (ex.getTargetException() instanceof JMSException) { 858 throw (JMSException) ex.getTargetException(); 859 } 860 ReflectionUtils.handleInvocationTargetException(ex); 861 return null; 862 } 863 catch (IllegalAccessException ex) { 864 throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage()); 865 } 866 } 867 else if (isSubscriptionDurable()) { 868 return session.createDurableSubscriber( 869 (Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal()); 870 } 871 else { 872 // Only pass in the NoLocal flag in case of a Topic (pub-sub mode): 873 // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException 874 // in case of the NoLocal flag being specified for a Queue. 875 return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal()); 876 } 877 } 878 else { 879 return session.createConsumer(destination, getMessageSelector()); 880 } 881 } 882 883 /** 884 * Handle the given exception that arose during listener execution. 885 * <p>The default implementation logs the exception at warn level, 886 * not propagating it to the JMS provider — assuming that all handling of 887 * acknowledgement and/or transactions is done by this listener container. 888 * This can be overridden in subclasses. 889 * @param ex the exception to handle 890 */ 891 protected void handleListenerException(Throwable ex) { 892 if (ex instanceof MessageRejectedWhileStoppingException) { 893 // Internal exception - has been handled before. 894 return; 895 } 896 if (ex instanceof JMSException) { 897 invokeExceptionListener((JMSException) ex); 898 } 899 if (isActive()) { 900 // Regular case: failed while active. 901 // Invoke ErrorHandler if available. 902 invokeErrorHandler(ex); 903 } 904 else { 905 // Rare case: listener thread failed after container shutdown. 906 // Log at debug level, to avoid spamming the shutdown log. 907 logger.debug("Listener exception after container shutdown", ex); 908 } 909 } 910 911 /** 912 * Invoke the registered JMS ExceptionListener, if any. 913 * @param ex the exception that arose during JMS processing 914 * @see #setExceptionListener 915 */ 916 protected void invokeExceptionListener(JMSException ex) { 917 ExceptionListener exceptionListener = getExceptionListener(); 918 if (exceptionListener != null) { 919 exceptionListener.onException(ex); 920 } 921 } 922 923 /** 924 * Invoke the registered ErrorHandler, if any. Log at warn level otherwise. 925 * @param ex the uncaught error that arose during JMS processing. 926 * @see #setErrorHandler 927 */ 928 protected void invokeErrorHandler(Throwable ex) { 929 ErrorHandler errorHandler = getErrorHandler(); 930 if (errorHandler != null) { 931 errorHandler.handleError(ex); 932 } 933 else { 934 logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex); 935 } 936 } 937 938 939 /** 940 * Internal exception class that indicates a rejected message on shutdown. 941 * Used to trigger a rollback for an external transaction manager in that case. 942 */ 943 @SuppressWarnings("serial") 944 private static class MessageRejectedWhileStoppingException extends RuntimeException { 945 } 946 947}