001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.lang.reflect.InvocationTargetException;
020import java.lang.reflect.Method;
021import javax.jms.Connection;
022import javax.jms.Destination;
023import javax.jms.ExceptionListener;
024import javax.jms.JMSException;
025import javax.jms.Message;
026import javax.jms.MessageConsumer;
027import javax.jms.MessageListener;
028import javax.jms.Queue;
029import javax.jms.Session;
030import javax.jms.Topic;
031
032import org.springframework.jms.support.JmsUtils;
033import org.springframework.jms.support.converter.MessageConverter;
034import org.springframework.util.Assert;
035import org.springframework.util.ClassUtils;
036import org.springframework.util.ErrorHandler;
037import org.springframework.util.ReflectionUtils;
038
039/**
040 * Abstract base class for Spring message listener container implementations.
041 * Can either host a standard JMS {@link javax.jms.MessageListener} or Spring's
042 * {@link SessionAwareMessageListener} for actual message processing.
043 *
044 * <p>Usually holds a single JMS {@link Connection} that all listeners are supposed
045 * to be registered on, which is the standard JMS way of managing listener sessions.
046 * Can alternatively also be used with a fresh Connection per listener, for Java EE
047 * style XA-aware JMS messaging. The actual registration process is up to concrete
048 * subclasses.
049 *
050 * <p><b>NOTE:</b> The default behavior of this message listener container is to
051 * <b>never</b> propagate an exception thrown by a message listener up to the JMS
052 * provider. Instead, it will log any such exception at the error level.
053 * This means that from the perspective of the attendant JMS provider no such
054 * listener will ever fail. However, if error handling is necessary, then
055 * any implementation of the {@link ErrorHandler} strategy may be provided to
056 * the {@link #setErrorHandler(ErrorHandler)} method. Note that JMSExceptions
057 * <b>will</b> be passed to the ErrorHandler in addition to (but after) being
058 * passed to an {@link ExceptionListener}, if one has been provided.
059 *
060 * <p>The listener container offers the following message acknowledgment options:
061 * <ul>
062 * <li>"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default):
063 * This mode is container-dependent: For {@link DefaultMessageListenerContainer},
064 * it means automatic message acknowledgment <i>before</i> listener execution, with
065 * no redelivery in case of an exception and no redelivery in case of other listener
066 * execution interruptions either. For {@link SimpleMessageListenerContainer},
067 * it means automatic message acknowledgment <i>after</i> listener execution, with
068 * no redelivery in case of a user exception thrown but potential redelivery in case
069 * of the JVM dying during listener execution. In order to consistently arrange for
070 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or -
071 * preferably - setting "sessionTransacted" to "true" instead.
072 * <li>"sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE":
073 * <i>Lazy</i> message acknowledgment during ({@link DefaultMessageListenerContainer})
074 * or shortly after ({@link SimpleMessageListenerContainer}) listener execution;
075 * no redelivery in case of a user exception thrown but potential redelivery in case
076 * of the JVM dying during listener execution. In order to consistently arrange for
077 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or -
078 * preferably - setting "sessionTransacted" to "true" instead.
079 * <li>"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE":
080 * Automatic message acknowledgment <i>after</i> successful listener execution;
081 * best-effort redelivery in case of a user exception thrown as well as in case
082 * of other listener execution interruptions (such as the JVM dying).
083 * <li>"sessionTransacted" set to "true":
084 * Transactional acknowledgment after successful listener execution;
085 * <i>guaranteed redelivery</i> in case of a user exception thrown as well as
086 * in case of other listener execution interruptions (such as the JVM dying).
087 * </ul>
088 *
089 * <p>There are two solutions to the duplicate message processing problem:
090 * <ul>
091 * <li>Either add <i>duplicate message detection</i> to your listener, in the
092 * form of a business entity existence check or a protocol table check. This
093 * usually just needs to be done in case of the JMSRedelivered flag being
094 * set on the incoming message (otherwise just process straightforwardly).
095 * Note that with "sessionTransacted" set to "true", duplicate messages will
096 * only appear in case of the JVM dying at the most unfortunate point possible
097 * (i.e. after your business logic executed but before the JMS part got committed),
098 * so duplicate message detection is just there to cover a corner case.
099 * <li>Or wrap your <i>entire processing with an XA transaction</i>, covering the
100 * reception of the JMS message as well as the execution of the business logic in
101 * your message listener (including database operations etc). This is only
102 * supported by {@link DefaultMessageListenerContainer}, through specifying
103 * an external "transactionManager" (typically a
104 * {@link org.springframework.transaction.jta.JtaTransactionManager}, with
105 * a corresponding XA-aware JMS {@link javax.jms.ConnectionFactory} passed in
106 * as "connectionFactory").
107 * </ul>
108 * Note that XA transaction coordination adds significant runtime overhead,
109 * so it might be feasible to avoid it unless absolutely necessary.
110 *
111 * <p><b>Recommendations:</b>
112 * <ul>
113 * <li>The general recommendation is to set "sessionTransacted" to "true",
114 * typically in combination with local database transactions triggered by the
115 * listener implementation, through Spring's standard transaction facilities.
116 * This will work nicely in Tomcat or in a standalone environment, often
117 * combined with custom duplicate message detection (if it is unacceptable
118 * to ever process the same message twice).
119 * <li>Alternatively, specify a
120 * {@link org.springframework.transaction.jta.JtaTransactionManager} as
121 * "transactionManager" for a fully XA-aware JMS provider - typically when
122 * running on a Java EE server, but also for other environments with a JTA
123 * transaction manager present. This will give full "exactly-once" guarantees
124 * without custom duplicate message checks, at the price of additional
125 * runtime processing overhead.
126 * </ul>
127 *
128 * <p>Note that the "sessionTransacted" flag is strongly recommended over
129 * {@link org.springframework.jms.connection.JmsTransactionManager}, provided
130 * that transactions do not need to be managed externally. As a consequence,
131 * set the transaction manager only if you are using JTA or if you need to
132 * synchronize with custom external transaction arrangements.
133 *
134 * @author Juergen Hoeller
135 * @author Stephane Nicoll
136 * @since 2.0
137 * @see #setMessageListener
138 * @see javax.jms.MessageListener
139 * @see SessionAwareMessageListener
140 * @see #handleListenerException
141 * @see DefaultMessageListenerContainer
142 * @see SimpleMessageListenerContainer
143 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
144 */
145public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer
146                implements MessageListenerContainer {
147
148        /** The JMS 2.0 Session.createSharedConsumer method, if available */
149        private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable(
150                        Session.class, "createSharedConsumer", Topic.class, String.class, String.class);
151
152        /** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
153        private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
154                        Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
155
156
157        private volatile Object destination;
158
159        private volatile String messageSelector;
160
161        private volatile Object messageListener;
162
163        private boolean subscriptionDurable = false;
164
165        private boolean subscriptionShared = false;
166
167        private String subscriptionName;
168
169        private Boolean replyPubSubDomain;
170
171        private boolean pubSubNoLocal = false;
172
173        private MessageConverter messageConverter;
174
175        private ExceptionListener exceptionListener;
176
177        private ErrorHandler errorHandler;
178
179        private boolean exposeListenerSession = true;
180
181        private boolean acceptMessagesWhileStopping = false;
182
183
184        /**
185         * Specify concurrency limits.
186         */
187        public abstract void setConcurrency(String concurrency);
188
189        /**
190         * Set the destination to receive messages from.
191         * <p>Alternatively, specify a "destinationName", to be dynamically
192         * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}.
193         * <p>Note: The destination may be replaced at runtime, with the listener
194         * container picking up the new destination immediately (works e.g. with
195         * DefaultMessageListenerContainer, as long as the cache level is less than
196         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
197         * @see #setDestinationName(String)
198         */
199        public void setDestination(Destination destination) {
200                this.destination = destination;
201                if (destination instanceof Topic && !(destination instanceof Queue)) {
202                        // Clearly a Topic: let's set the "pubSubDomain" flag accordingly.
203                        setPubSubDomain(true);
204                }
205        }
206
207        /**
208         * Return the destination to receive messages from. Will be {@code null}
209         * if the configured destination is not an actual {@link Destination} type;
210         * c.f. {@link #setDestinationName(String) when the destination is a String}.
211         */
212        public Destination getDestination() {
213                return (this.destination instanceof Destination ? (Destination) this.destination : null);
214        }
215
216        /**
217         * Set the name of the destination to receive messages from.
218         * <p>The specified name will be dynamically resolved via the configured
219         * {@link #setDestinationResolver destination resolver}.
220         * <p>Alternatively, specify a JMS {@link Destination} object as "destination".
221         * <p>Note: The destination may be replaced at runtime, with the listener
222         * container picking up the new destination immediately (works e.g. with
223         * DefaultMessageListenerContainer, as long as the cache level is less than
224         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
225         * @see #setDestination(javax.jms.Destination)
226         */
227        public void setDestinationName(String destinationName) {
228                this.destination = destinationName;
229        }
230
231        /**
232         * Return the name of the destination to receive messages from.
233         * Will be {@code null} if the configured destination is not a
234         * {@link String} type; c.f. {@link #setDestination(Destination) when
235         * it is an actual Destination}.
236         */
237        public String getDestinationName() {
238                return (this.destination instanceof String ? (String) this.destination : null);
239        }
240
241        /**
242         * Return a descriptive String for this container's JMS destination
243         * (never {@code null}).
244         */
245        protected String getDestinationDescription() {
246                Object destination = this.destination;
247                return (destination != null ? destination.toString() : "");
248        }
249
250        /**
251         * Set the JMS message selector expression (or {@code null} if none).
252         * Default is none.
253         * <p>See the JMS specification for a detailed definition of selector expressions.
254         * <p>Note: The message selector may be replaced at runtime, with the listener
255         * container picking up the new selector value immediately (works e.g. with
256         * DefaultMessageListenerContainer, as long as the cache level is less than
257         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
258         */
259        public void setMessageSelector(String messageSelector) {
260                this.messageSelector = messageSelector;
261        }
262
263        /**
264         * Return the JMS message selector expression (or {@code null} if none).
265         */
266        public String getMessageSelector() {
267                return this.messageSelector;
268        }
269
270
271        /**
272         * Set the message listener implementation to register.
273         * This can be either a standard JMS {@link MessageListener} object
274         * or a Spring {@link SessionAwareMessageListener} object.
275         * <p>Note: The message listener may be replaced at runtime, with the listener
276         * container picking up the new listener object immediately (works e.g. with
277         * DefaultMessageListenerContainer, as long as the cache level is less than
278         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
279         * @throws IllegalArgumentException if the supplied listener is not a
280         * {@link MessageListener} or a {@link SessionAwareMessageListener}
281         * @see javax.jms.MessageListener
282         * @see SessionAwareMessageListener
283         */
284        public void setMessageListener(Object messageListener) {
285                checkMessageListener(messageListener);
286                this.messageListener = messageListener;
287                if (this.subscriptionName == null) {
288                        this.subscriptionName = getDefaultSubscriptionName(messageListener);
289                }
290        }
291
292        /**
293         * Return the message listener object to register.
294         */
295        public Object getMessageListener() {
296                return this.messageListener;
297        }
298
299        /**
300         * Check the given message listener, throwing an exception
301         * if it does not correspond to a supported listener type.
302         * <p>By default, only a standard JMS {@link MessageListener} object or a
303         * Spring {@link SessionAwareMessageListener} object will be accepted.
304         * @param messageListener the message listener object to check
305         * @throws IllegalArgumentException if the supplied listener is not a
306         * {@link MessageListener} or a {@link SessionAwareMessageListener}
307         * @see javax.jms.MessageListener
308         * @see SessionAwareMessageListener
309         */
310        protected void checkMessageListener(Object messageListener) {
311                if (!(messageListener instanceof MessageListener ||
312                                messageListener instanceof SessionAwareMessageListener)) {
313                        throw new IllegalArgumentException(
314                                        "Message listener needs to be of type [" + MessageListener.class.getName() +
315                                        "] or [" + SessionAwareMessageListener.class.getName() + "]");
316                }
317        }
318
319        /**
320         * Determine the default subscription name for the given message listener.
321         * @param messageListener the message listener object to check
322         * @return the default subscription name
323         * @see SubscriptionNameProvider
324         */
325        protected String getDefaultSubscriptionName(Object messageListener) {
326                if (messageListener instanceof SubscriptionNameProvider) {
327                        return ((SubscriptionNameProvider) messageListener).getSubscriptionName();
328                }
329                else {
330                        return messageListener.getClass().getName();
331                }
332        }
333
334        /**
335         * Set whether to make the subscription durable. The durable subscription name
336         * to be used can be specified through the "subscriptionName" property.
337         * <p>Default is "false". Set this to "true" to register a durable subscription,
338         * typically in combination with a "subscriptionName" value (unless
339         * your message listener class name is good enough as subscription name).
340         * <p>Only makes sense when listening to a topic (pub-sub domain),
341         * therefore this method switches the "pubSubDomain" flag as well.
342         * @see #setSubscriptionName
343         * @see #setPubSubDomain
344         */
345        public void setSubscriptionDurable(boolean subscriptionDurable) {
346                this.subscriptionDurable = subscriptionDurable;
347                if (subscriptionDurable) {
348                        setPubSubDomain(true);
349                }
350        }
351
352        /**
353         * Return whether to make the subscription durable.
354         */
355        public boolean isSubscriptionDurable() {
356                return this.subscriptionDurable;
357        }
358
359        /**
360         * Set whether to make the subscription shared. The shared subscription name
361         * to be used can be specified through the "subscriptionName" property.
362         * <p>Default is "false". Set this to "true" to register a shared subscription,
363         * typically in combination with a "subscriptionName" value (unless
364         * your message listener class name is good enough as subscription name).
365         * Note that shared subscriptions may also be durable, so this flag can
366         * (and often will) be combined with "subscriptionDurable" as well.
367         * <p>Only makes sense when listening to a topic (pub-sub domain),
368         * therefore this method switches the "pubSubDomain" flag as well.
369         * <p><b>Requires a JMS 2.0 compatible message broker.</b>
370         * @since 4.1
371         * @see #setSubscriptionName
372         * @see #setSubscriptionDurable
373         * @see #setPubSubDomain
374         */
375        public void setSubscriptionShared(boolean subscriptionShared) {
376                this.subscriptionShared = subscriptionShared;
377                if (subscriptionShared) {
378                        setPubSubDomain(true);
379                }
380        }
381
382        /**
383         * Return whether to make the subscription shared.
384         * @since 4.1
385         */
386        public boolean isSubscriptionShared() {
387                return this.subscriptionShared;
388        }
389
390        /**
391         * Set the name of a subscription to create. To be applied in case
392         * of a topic (pub-sub domain) with a shared or durable subscription.
393         * <p>The subscription name needs to be unique within this client's
394         * JMS client id. Default is the class name of the specified message listener.
395         * <p>Note: Only 1 concurrent consumer (which is the default of this
396         * message listener container) is allowed for each subscription,
397         * except for a shared subscription (which requires JMS 2.0).
398         * @since 4.1
399         * @see #setPubSubDomain
400         * @see #setSubscriptionDurable
401         * @see #setSubscriptionShared
402         * @see #setClientId
403         * @see #setMessageListener
404         */
405        public void setSubscriptionName(String subscriptionName) {
406                this.subscriptionName = subscriptionName;
407        }
408
409        /**
410         * Return the name of a subscription to create, if any.
411         * @since 4.1
412         */
413        public String getSubscriptionName() {
414                return this.subscriptionName;
415        }
416
417        /**
418         * Set the name of a durable subscription to create. This method switches
419         * to pub-sub domain mode and activates subscription durability as well.
420         * <p>The durable subscription name needs to be unique within this client's
421         * JMS client id. Default is the class name of the specified message listener.
422         * <p>Note: Only 1 concurrent consumer (which is the default of this
423         * message listener container) is allowed for each durable subscription,
424         * except for a shared durable subscription (which requires JMS 2.0).
425         * @see #setPubSubDomain
426         * @see #setSubscriptionDurable
427         * @see #setSubscriptionShared
428         * @see #setClientId
429         * @see #setMessageListener
430         */
431        public void setDurableSubscriptionName(String durableSubscriptionName) {
432                this.subscriptionName = durableSubscriptionName;
433                this.subscriptionDurable = (durableSubscriptionName != null);
434        }
435
436        /**
437         * Return the name of a durable subscription to create, if any.
438         */
439        public String getDurableSubscriptionName() {
440                return (this.subscriptionDurable ? this.subscriptionName : null);
441        }
442
443        /**
444         * Set whether to inhibit the delivery of messages published by its own connection.
445         * Default is "false".
446         * @since 4.1
447         * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean)
448         */
449        public void setPubSubNoLocal(boolean pubSubNoLocal) {
450                this.pubSubNoLocal = pubSubNoLocal;
451        }
452
453        /**
454         * Return whether to inhibit the delivery of messages published by its own connection.
455         * @since 4.1
456         */
457        public boolean isPubSubNoLocal() {
458                return this.pubSubNoLocal;
459        }
460
461        /**
462         * Configure the reply destination type. By default, the configured {@code pubSubDomain}
463         * value is used (see {@link #isPubSubDomain()}.
464         * <p>This setting primarily indicates what type of destination to resolve if dynamic
465         * destinations are enabled.
466         * @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link Topic Topics}),
467         * "false" for the Point-to-Point domain ({@link Queue Queues})
468         * @since 4.2
469         * @see #setDestinationResolver
470         */
471        public void setReplyPubSubDomain(boolean replyPubSubDomain) {
472                this.replyPubSubDomain = replyPubSubDomain;
473        }
474
475        /**
476         * Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used
477         * for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues})
478         * is used.
479         * @since 4.2
480         */
481        @Override
482        public boolean isReplyPubSubDomain() {
483                if (this.replyPubSubDomain != null) {
484                        return replyPubSubDomain;
485                }
486                else {
487                        return isPubSubDomain();
488                }
489        }
490
491        /**
492         * Set the {@link MessageConverter} strategy for converting JMS Messages.
493         * @since 4.1
494         */
495        public void setMessageConverter(MessageConverter messageConverter) {
496                this.messageConverter = messageConverter;
497        }
498
499        @Override
500        public MessageConverter getMessageConverter() {
501                return this.messageConverter;
502        }
503
504        /**
505         * Set the JMS ExceptionListener to notify in case of a JMSException thrown
506         * by the registered message listener or the invocation infrastructure.
507         */
508        public void setExceptionListener(ExceptionListener exceptionListener) {
509                this.exceptionListener = exceptionListener;
510        }
511
512        /**
513         * Return the JMS ExceptionListener to notify in case of a JMSException thrown
514         * by the registered message listener or the invocation infrastructure, if any.
515         */
516        public ExceptionListener getExceptionListener() {
517                return this.exceptionListener;
518        }
519
520        /**
521         * Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown
522         * while processing a Message.
523         * <p>By default, there will be <b>no</b> ErrorHandler so that error-level
524         * logging is the only result.
525         */
526        public void setErrorHandler(ErrorHandler errorHandler) {
527                this.errorHandler = errorHandler;
528        }
529
530        /**
531         * Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown
532         * while processing a Message.
533         * @since 4.1
534         */
535        public ErrorHandler getErrorHandler() {
536                return this.errorHandler;
537        }
538
539        /**
540         * Set whether to expose the listener JMS Session to a registered
541         * {@link SessionAwareMessageListener} as well as to
542         * {@link org.springframework.jms.core.JmsTemplate} calls.
543         * <p>Default is "true", reusing the listener's {@link Session}.
544         * Turn this off to expose a fresh JMS Session fetched from the same
545         * underlying JMS {@link Connection} instead, which might be necessary
546         * on some JMS providers.
547         * <p>Note that Sessions managed by an external transaction manager will
548         * always get exposed to {@link org.springframework.jms.core.JmsTemplate}
549         * calls. So in terms of JmsTemplate exposure, this setting only affects
550         * locally transacted Sessions.
551         * @see SessionAwareMessageListener
552         */
553        public void setExposeListenerSession(boolean exposeListenerSession) {
554                this.exposeListenerSession = exposeListenerSession;
555        }
556
557        /**
558         * Return whether to expose the listener JMS {@link Session} to a
559         * registered {@link SessionAwareMessageListener}.
560         */
561        public boolean isExposeListenerSession() {
562                return this.exposeListenerSession;
563        }
564
565        /**
566         * Set whether to accept received messages while the listener container
567         * in the process of stopping.
568         * <p>Default is "false", rejecting such messages through aborting the
569         * receive attempt. Switch this flag on to fully process such messages
570         * even in the stopping phase, with the drawback that even newly sent
571         * messages might still get processed (if coming in before all receive
572         * timeouts have expired).
573         * <p><b>NOTE:</b> Aborting receive attempts for such incoming messages
574         * might lead to the provider's retry count decreasing for the affected
575         * messages. If you have a high number of concurrent consumers, make sure
576         * that the number of retries is higher than the number of consumers,
577         * to be on the safe side for all potential stopping scenarios.
578         */
579        public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
580                this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
581        }
582
583        /**
584         * Return whether to accept received messages while the listener container
585         * in the process of stopping.
586         */
587        public boolean isAcceptMessagesWhileStopping() {
588                return this.acceptMessagesWhileStopping;
589        }
590
591        @Override
592        protected void validateConfiguration() {
593                if (this.destination == null) {
594                        throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required");
595                }
596        }
597
598        @Override
599        public void setupMessageListener(Object messageListener) {
600                setMessageListener(messageListener);
601        }
602
603
604        //-------------------------------------------------------------------------
605        // Template methods for listener execution
606        //-------------------------------------------------------------------------
607
608        /**
609         * Execute the specified listener,
610         * committing or rolling back the transaction afterwards (if necessary).
611         * @param session the JMS Session to operate on
612         * @param message the received JMS Message
613         * @see #invokeListener
614         * @see #commitIfNecessary
615         * @see #rollbackOnExceptionIfNecessary
616         * @see #handleListenerException
617         */
618        protected void executeListener(Session session, Message message) {
619                try {
620                        doExecuteListener(session, message);
621                }
622                catch (Throwable ex) {
623                        handleListenerException(ex);
624                }
625        }
626
627        /**
628         * Execute the specified listener,
629         * committing or rolling back the transaction afterwards (if necessary).
630         * @param session the JMS Session to operate on
631         * @param message the received JMS Message
632         * @throws JMSException if thrown by JMS API methods
633         * @see #invokeListener
634         * @see #commitIfNecessary
635         * @see #rollbackOnExceptionIfNecessary
636         * @see #convertJmsAccessException
637         */
638        protected void doExecuteListener(Session session, Message message) throws JMSException {
639                if (!isAcceptMessagesWhileStopping() && !isRunning()) {
640                        if (logger.isWarnEnabled()) {
641                                logger.warn("Rejecting received message because of the listener container " +
642                                                "having been stopped in the meantime: " + message);
643                        }
644                        rollbackIfNecessary(session);
645                        throw new MessageRejectedWhileStoppingException();
646                }
647
648                try {
649                        invokeListener(session, message);
650                }
651                catch (JMSException ex) {
652                        rollbackOnExceptionIfNecessary(session, ex);
653                        throw ex;
654                }
655                catch (RuntimeException ex) {
656                        rollbackOnExceptionIfNecessary(session, ex);
657                        throw ex;
658                }
659                catch (Error err) {
660                        rollbackOnExceptionIfNecessary(session, err);
661                        throw err;
662                }
663                commitIfNecessary(session, message);
664        }
665
666        /**
667         * Invoke the specified listener: either as standard JMS MessageListener
668         * or (preferably) as Spring SessionAwareMessageListener.
669         * @param session the JMS Session to operate on
670         * @param message the received JMS Message
671         * @throws JMSException if thrown by JMS API methods
672         * @see #setMessageListener
673         */
674        @SuppressWarnings("rawtypes")
675        protected void invokeListener(Session session, Message message) throws JMSException {
676                Object listener = getMessageListener();
677
678                if (listener instanceof SessionAwareMessageListener) {
679                        doInvokeListener((SessionAwareMessageListener) listener, session, message);
680                }
681                else if (listener instanceof MessageListener) {
682                        doInvokeListener((MessageListener) listener, message);
683                }
684                else if (listener != null) {
685                        throw new IllegalArgumentException(
686                                        "Only MessageListener and SessionAwareMessageListener supported: " + listener);
687                }
688                else {
689                        throw new IllegalStateException("No message listener specified - see property 'messageListener'");
690                }
691        }
692
693        /**
694         * Invoke the specified listener as Spring SessionAwareMessageListener,
695         * exposing a new JMS Session (potentially with its own transaction)
696         * to the listener if demanded.
697         * @param listener the Spring SessionAwareMessageListener to invoke
698         * @param session the JMS Session to operate on
699         * @param message the received JMS Message
700         * @throws JMSException if thrown by JMS API methods
701         * @see SessionAwareMessageListener
702         * @see #setExposeListenerSession
703         */
704        @SuppressWarnings({ "unchecked", "rawtypes" })
705        protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message)
706                        throws JMSException {
707
708                Connection conToClose = null;
709                Session sessionToClose = null;
710                try {
711                        Session sessionToUse = session;
712                        if (!isExposeListenerSession()) {
713                                // We need to expose a separate Session.
714                                conToClose = createConnection();
715                                sessionToClose = createSession(conToClose);
716                                sessionToUse = sessionToClose;
717                        }
718                        // Actually invoke the message listener...
719                        listener.onMessage(message, sessionToUse);
720                        // Clean up specially exposed Session, if any.
721                        if (sessionToUse != session) {
722                                if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) {
723                                        // Transacted session created by this container -> commit.
724                                        JmsUtils.commitIfNecessary(sessionToUse);
725                                }
726                        }
727                }
728                finally {
729                        JmsUtils.closeSession(sessionToClose);
730                        JmsUtils.closeConnection(conToClose);
731                }
732        }
733
734        /**
735         * Invoke the specified listener as standard JMS MessageListener.
736         * <p>Default implementation performs a plain invocation of the
737         * {@code onMessage} method.
738         * @param listener the JMS MessageListener to invoke
739         * @param message the received JMS Message
740         * @throws JMSException if thrown by JMS API methods
741         * @see javax.jms.MessageListener#onMessage
742         */
743        protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
744                listener.onMessage(message);
745        }
746
747        /**
748         * Perform a commit or message acknowledgement, as appropriate.
749         * @param session the JMS Session to commit
750         * @param message the Message to acknowledge
751         * @throws javax.jms.JMSException in case of commit failure
752         */
753        protected void commitIfNecessary(Session session, Message message) throws JMSException {
754                // Commit session or acknowledge message.
755                if (session.getTransacted()) {
756                        // Commit necessary - but avoid commit call within a JTA transaction.
757                        if (isSessionLocallyTransacted(session)) {
758                                // Transacted session created by this container -> commit.
759                                JmsUtils.commitIfNecessary(session);
760                        }
761                }
762                else if (message != null && isClientAcknowledge(session)) {
763                        message.acknowledge();
764                }
765        }
766
767        /**
768         * Perform a rollback, if appropriate.
769         * @param session the JMS Session to rollback
770         * @throws javax.jms.JMSException in case of a rollback error
771         */
772        protected void rollbackIfNecessary(Session session) throws JMSException {
773                if (session.getTransacted()) {
774                        if (isSessionLocallyTransacted(session)) {
775                                // Transacted session created by this container -> rollback.
776                                JmsUtils.rollbackIfNecessary(session);
777                        }
778                }
779                else if (isClientAcknowledge(session)) {
780                        session.recover();
781                }
782        }
783
784        /**
785         * Perform a rollback, handling rollback exceptions properly.
786         * @param session the JMS Session to rollback
787         * @param ex the thrown application exception or error
788         * @throws javax.jms.JMSException in case of a rollback error
789         */
790        protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException {
791                try {
792                        if (session.getTransacted()) {
793                                if (isSessionLocallyTransacted(session)) {
794                                        // Transacted session created by this container -> rollback.
795                                        if (logger.isDebugEnabled()) {
796                                                logger.debug("Initiating transaction rollback on application exception", ex);
797                                        }
798                                        JmsUtils.rollbackIfNecessary(session);
799                                }
800                        }
801                        else if (isClientAcknowledge(session)) {
802                                session.recover();
803                        }
804                }
805                catch (IllegalStateException ex2) {
806                        logger.debug("Could not roll back because Session already closed", ex2);
807                }
808                catch (JMSException ex2) {
809                        logger.error("Application exception overridden by rollback exception", ex);
810                        throw ex2;
811                }
812                catch (RuntimeException ex2) {
813                        logger.error("Application exception overridden by rollback exception", ex);
814                        throw ex2;
815                }
816                catch (Error err) {
817                        logger.error("Application exception overridden by rollback error", ex);
818                        throw err;
819                }
820        }
821
822        /**
823         * Check whether the given Session is locally transacted, that is, whether
824         * its transaction is managed by this listener container's Session handling
825         * and not by an external transaction coordinator.
826         * <p>Note: The Session's own transacted flag will already have been checked
827         * before. This method is about finding out whether the Session's transaction
828         * is local or externally coordinated.
829         * @param session the Session to check
830         * @return whether the given Session is locally transacted
831         * @see #isSessionTransacted()
832         * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
833         */
834        protected boolean isSessionLocallyTransacted(Session session) {
835                return isSessionTransacted();
836        }
837
838        /**
839         * Create a JMS MessageConsumer for the given Session and Destination.
840         * <p>This implementation uses JMS 1.1 API.
841         * @param session the JMS Session to create a MessageConsumer for
842         * @param destination the JMS Destination to create a MessageConsumer for
843         * @return the new JMS MessageConsumer
844         * @throws javax.jms.JMSException if thrown by JMS API methods
845         */
846        protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
847                if (isPubSubDomain() && destination instanceof Topic) {
848                        if (isSubscriptionShared()) {
849                                // createSharedConsumer((Topic) dest, subscription, selector);
850                                // createSharedDurableConsumer((Topic) dest, subscription, selector);
851                                Method method = (isSubscriptionDurable() ?
852                                                createSharedDurableConsumerMethod : createSharedConsumerMethod);
853                                try {
854                                        return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
855                                }
856                                catch (InvocationTargetException ex) {
857                                        if (ex.getTargetException() instanceof JMSException) {
858                                                throw (JMSException) ex.getTargetException();
859                                        }
860                                        ReflectionUtils.handleInvocationTargetException(ex);
861                                        return null;
862                                }
863                                catch (IllegalAccessException ex) {
864                                        throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage());
865                                }
866                        }
867                        else if (isSubscriptionDurable()) {
868                                return session.createDurableSubscriber(
869                                                (Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
870                        }
871                        else {
872                                // Only pass in the NoLocal flag in case of a Topic (pub-sub mode):
873                                // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
874                                // in case of the NoLocal flag being specified for a Queue.
875                                return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
876                        }
877                }
878                else {
879                        return session.createConsumer(destination, getMessageSelector());
880                }
881        }
882
883        /**
884         * Handle the given exception that arose during listener execution.
885         * <p>The default implementation logs the exception at warn level,
886         * not propagating it to the JMS provider &mdash; assuming that all handling of
887         * acknowledgement and/or transactions is done by this listener container.
888         * This can be overridden in subclasses.
889         * @param ex the exception to handle
890         */
891        protected void handleListenerException(Throwable ex) {
892                if (ex instanceof MessageRejectedWhileStoppingException) {
893                        // Internal exception - has been handled before.
894                        return;
895                }
896                if (ex instanceof JMSException) {
897                        invokeExceptionListener((JMSException) ex);
898                }
899                if (isActive()) {
900                        // Regular case: failed while active.
901                        // Invoke ErrorHandler if available.
902                        invokeErrorHandler(ex);
903                }
904                else {
905                        // Rare case: listener thread failed after container shutdown.
906                        // Log at debug level, to avoid spamming the shutdown log.
907                        logger.debug("Listener exception after container shutdown", ex);
908                }
909        }
910
911        /**
912         * Invoke the registered JMS ExceptionListener, if any.
913         * @param ex the exception that arose during JMS processing
914         * @see #setExceptionListener
915         */
916        protected void invokeExceptionListener(JMSException ex) {
917                ExceptionListener exceptionListener = getExceptionListener();
918                if (exceptionListener != null) {
919                        exceptionListener.onException(ex);
920                }
921        }
922
923        /**
924         * Invoke the registered ErrorHandler, if any. Log at warn level otherwise.
925         * @param ex the uncaught error that arose during JMS processing.
926         * @see #setErrorHandler
927         */
928        protected void invokeErrorHandler(Throwable ex) {
929                ErrorHandler errorHandler = getErrorHandler();
930                if (errorHandler != null) {
931                        errorHandler.handleError(ex);
932                }
933                else {
934                        logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex);
935                }
936        }
937
938
939        /**
940         * Internal exception class that indicates a rejected message on shutdown.
941         * Used to trigger a rollback for an external transaction manager in that case.
942         */
943        @SuppressWarnings("serial")
944        private static class MessageRejectedWhileStoppingException extends RuntimeException {
945        }
946
947}