001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.ExceptionListener;
022import javax.jms.JMSException;
023import javax.jms.Message;
024import javax.jms.MessageConsumer;
025import javax.jms.MessageListener;
026import javax.jms.Queue;
027import javax.jms.Session;
028import javax.jms.Topic;
029
030import org.springframework.jms.support.JmsUtils;
031import org.springframework.jms.support.QosSettings;
032import org.springframework.jms.support.converter.MessageConverter;
033import org.springframework.lang.Nullable;
034import org.springframework.util.ErrorHandler;
035
036/**
037 * Abstract base class for Spring message listener container implementations.
038 * Can either host a standard JMS {@link javax.jms.MessageListener} or Spring's
039 * {@link SessionAwareMessageListener} for actual message processing.
040 *
041 * <p>Usually holds a single JMS {@link Connection} that all listeners are supposed
042 * to be registered on, which is the standard JMS way of managing listener sessions.
043 * Can alternatively also be used with a fresh Connection per listener, for Java EE
044 * style XA-aware JMS messaging. The actual registration process is up to concrete
045 * subclasses.
046 *
047 * <p><b>NOTE:</b> The default behavior of this message listener container is to
048 * <b>never</b> propagate an exception thrown by a message listener up to the JMS
049 * provider. Instead, it will log any such exception at the error level.
050 * This means that from the perspective of the attendant JMS provider no such
051 * listener will ever fail. However, if error handling is necessary, then
052 * any implementation of the {@link ErrorHandler} strategy may be provided to
053 * the {@link #setErrorHandler(ErrorHandler)} method. Note that JMSExceptions
054 * <b>will</b> be passed to the ErrorHandler in addition to (but after) being
055 * passed to an {@link ExceptionListener}, if one has been provided.
056 *
057 * <p>The listener container offers the following message acknowledgment options:
058 * <ul>
059 * <li>"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default):
060 * This mode is container-dependent: For {@link DefaultMessageListenerContainer},
061 * it means automatic message acknowledgment <i>before</i> listener execution, with
062 * no redelivery in case of an exception and no redelivery in case of other listener
063 * execution interruptions either. For {@link SimpleMessageListenerContainer},
064 * it means automatic message acknowledgment <i>after</i> listener execution, with
065 * no redelivery in case of a user exception thrown but potential redelivery in case
066 * of the JVM dying during listener execution. In order to consistently arrange for
067 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or -
068 * preferably - setting "sessionTransacted" to "true" instead.
069 * <li>"sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE":
070 * <i>Lazy</i> message acknowledgment during ({@link DefaultMessageListenerContainer})
071 * or shortly after ({@link SimpleMessageListenerContainer}) listener execution;
072 * no redelivery in case of a user exception thrown but potential redelivery in case
073 * of the JVM dying during listener execution. In order to consistently arrange for
074 * redelivery with any container variant, consider "CLIENT_ACKNOWLEDGE" mode or -
075 * preferably - setting "sessionTransacted" to "true" instead.
076 * <li>"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE":
077 * Automatic message acknowledgment <i>after</i> successful listener execution;
078 * best-effort redelivery in case of a user exception thrown as well as in case
079 * of other listener execution interruptions (such as the JVM dying).
080 * <li>"sessionTransacted" set to "true":
081 * Transactional acknowledgment after successful listener execution;
082 * <i>guaranteed redelivery</i> in case of a user exception thrown as well as
083 * in case of other listener execution interruptions (such as the JVM dying).
084 * </ul>
085 *
086 * <p>There are two solutions to the duplicate message processing problem:
087 * <ul>
088 * <li>Either add <i>duplicate message detection</i> to your listener, in the
089 * form of a business entity existence check or a protocol table check. This
090 * usually just needs to be done in case of the JMSRedelivered flag being
091 * set on the incoming message (otherwise just process straightforwardly).
092 * Note that with "sessionTransacted" set to "true", duplicate messages will
093 * only appear in case of the JVM dying at the most unfortunate point possible
094 * (i.e. after your business logic executed but before the JMS part got committed),
095 * so duplicate message detection is just there to cover a corner case.
096 * <li>Or wrap your <i>entire processing with an XA transaction</i>, covering the
097 * reception of the JMS message as well as the execution of the business logic in
098 * your message listener (including database operations etc). This is only
099 * supported by {@link DefaultMessageListenerContainer}, through specifying
100 * an external "transactionManager" (typically a
101 * {@link org.springframework.transaction.jta.JtaTransactionManager}, with
102 * a corresponding XA-aware JMS {@link javax.jms.ConnectionFactory} passed in
103 * as "connectionFactory").
104 * </ul>
105 * Note that XA transaction coordination adds significant runtime overhead,
106 * so it might be feasible to avoid it unless absolutely necessary.
107 *
108 * <p><b>Recommendations:</b>
109 * <ul>
110 * <li>The general recommendation is to set "sessionTransacted" to "true",
111 * typically in combination with local database transactions triggered by the
112 * listener implementation, through Spring's standard transaction facilities.
113 * This will work nicely in Tomcat or in a standalone environment, often
114 * combined with custom duplicate message detection (if it is unacceptable
115 * to ever process the same message twice).
116 * <li>Alternatively, specify a
117 * {@link org.springframework.transaction.jta.JtaTransactionManager} as
118 * "transactionManager" for a fully XA-aware JMS provider - typically when
119 * running on a Java EE server, but also for other environments with a JTA
120 * transaction manager present. This will give full "exactly-once" guarantees
121 * without custom duplicate message checks, at the price of additional
122 * runtime processing overhead.
123 * </ul>
124 *
125 * <p>Note that the "sessionTransacted" flag is strongly recommended over
126 * {@link org.springframework.jms.connection.JmsTransactionManager}, provided
127 * that transactions do not need to be managed externally. As a consequence,
128 * set the transaction manager only if you are using JTA or if you need to
129 * synchronize with custom external transaction arrangements.
130 *
131 * @author Juergen Hoeller
132 * @author Stephane Nicoll
133 * @since 2.0
134 * @see #setMessageListener
135 * @see javax.jms.MessageListener
136 * @see SessionAwareMessageListener
137 * @see #handleListenerException
138 * @see DefaultMessageListenerContainer
139 * @see SimpleMessageListenerContainer
140 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
141 */
142public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer
143                implements MessageListenerContainer {
144
145        @Nullable
146        private volatile Object destination;
147
148        @Nullable
149        private volatile String messageSelector;
150
151        @Nullable
152        private volatile Object messageListener;
153
154        private boolean subscriptionDurable = false;
155
156        private boolean subscriptionShared = false;
157
158        @Nullable
159        private String subscriptionName;
160
161        @Nullable
162        private Boolean replyPubSubDomain;
163
164        @Nullable
165        private QosSettings replyQosSettings;
166
167        private boolean pubSubNoLocal = false;
168
169        @Nullable
170        private MessageConverter messageConverter;
171
172        @Nullable
173        private ExceptionListener exceptionListener;
174
175        @Nullable
176        private ErrorHandler errorHandler;
177
178        private boolean exposeListenerSession = true;
179
180        private boolean acceptMessagesWhileStopping = false;
181
182
183        /**
184         * Specify concurrency limits.
185         */
186        public abstract void setConcurrency(String concurrency);
187
188        /**
189         * Set the destination to receive messages from.
190         * <p>Alternatively, specify a "destinationName", to be dynamically
191         * resolved via the {@link org.springframework.jms.support.destination.DestinationResolver}.
192         * <p>Note: The destination may be replaced at runtime, with the listener
193         * container picking up the new destination immediately (works e.g. with
194         * DefaultMessageListenerContainer, as long as the cache level is less than
195         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
196         * @see #setDestinationName(String)
197         */
198        public void setDestination(@Nullable Destination destination) {
199                this.destination = destination;
200                if (destination instanceof Topic && !(destination instanceof Queue)) {
201                        // Clearly a Topic: let's set the "pubSubDomain" flag accordingly.
202                        setPubSubDomain(true);
203                }
204        }
205
206        /**
207         * Return the destination to receive messages from. Will be {@code null}
208         * if the configured destination is not an actual {@link Destination} type;
209         * c.f. {@link #setDestinationName(String) when the destination is a String}.
210         */
211        @Nullable
212        public Destination getDestination() {
213                return (this.destination instanceof Destination ? (Destination) this.destination : null);
214        }
215
216        /**
217         * Set the name of the destination to receive messages from.
218         * <p>The specified name will be dynamically resolved via the configured
219         * {@link #setDestinationResolver destination resolver}.
220         * <p>Alternatively, specify a JMS {@link Destination} object as "destination".
221         * <p>Note: The destination may be replaced at runtime, with the listener
222         * container picking up the new destination immediately (works e.g. with
223         * DefaultMessageListenerContainer, as long as the cache level is less than
224         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
225         * @see #setDestination(javax.jms.Destination)
226         */
227        public void setDestinationName(@Nullable String destinationName) {
228                this.destination = destinationName;
229        }
230
231        /**
232         * Return the name of the destination to receive messages from.
233         * Will be {@code null} if the configured destination is not a
234         * {@link String} type; c.f. {@link #setDestination(Destination) when
235         * it is an actual Destination}.
236         */
237        @Nullable
238        public String getDestinationName() {
239                return (this.destination instanceof String ? (String) this.destination : null);
240        }
241
242        /**
243         * Return a descriptive String for this container's JMS destination
244         * (never {@code null}).
245         */
246        protected String getDestinationDescription() {
247                Object destination = this.destination;
248                return (destination != null ? destination.toString() : "");
249        }
250
251        /**
252         * Set the JMS message selector expression (or {@code null} if none).
253         * Default is none.
254         * <p>See the JMS specification for a detailed definition of selector expressions.
255         * <p>Note: The message selector may be replaced at runtime, with the listener
256         * container picking up the new selector value immediately (works e.g. with
257         * DefaultMessageListenerContainer, as long as the cache level is less than
258         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
259         */
260        public void setMessageSelector(@Nullable String messageSelector) {
261                this.messageSelector = messageSelector;
262        }
263
264        /**
265         * Return the JMS message selector expression (or {@code null} if none).
266         */
267        @Nullable
268        public String getMessageSelector() {
269                return this.messageSelector;
270        }
271
272
273        /**
274         * Set the message listener implementation to register.
275         * This can be either a standard JMS {@link MessageListener} object
276         * or a Spring {@link SessionAwareMessageListener} object.
277         * <p>Note: The message listener may be replaced at runtime, with the listener
278         * container picking up the new listener object immediately (works e.g. with
279         * DefaultMessageListenerContainer, as long as the cache level is less than
280         * CACHE_CONSUMER). However, this is considered advanced usage; use it with care!
281         * @throws IllegalArgumentException if the supplied listener is not a
282         * {@link MessageListener} or a {@link SessionAwareMessageListener}
283         * @see javax.jms.MessageListener
284         * @see SessionAwareMessageListener
285         */
286        public void setMessageListener(@Nullable Object messageListener) {
287                checkMessageListener(messageListener);
288                this.messageListener = messageListener;
289                if (messageListener != null && this.subscriptionName == null) {
290                        this.subscriptionName = getDefaultSubscriptionName(messageListener);
291                }
292        }
293
294        /**
295         * Return the message listener object to register.
296         */
297        @Nullable
298        public Object getMessageListener() {
299                return this.messageListener;
300        }
301
302        /**
303         * Check the given message listener, throwing an exception
304         * if it does not correspond to a supported listener type.
305         * <p>By default, only a standard JMS {@link MessageListener} object or a
306         * Spring {@link SessionAwareMessageListener} object will be accepted.
307         * @param messageListener the message listener object to check
308         * @throws IllegalArgumentException if the supplied listener is not a
309         * {@link MessageListener} or a {@link SessionAwareMessageListener}
310         * @see javax.jms.MessageListener
311         * @see SessionAwareMessageListener
312         */
313        protected void checkMessageListener(@Nullable Object messageListener) {
314                if (messageListener != null && !(messageListener instanceof MessageListener ||
315                                messageListener instanceof SessionAwareMessageListener)) {
316                        throw new IllegalArgumentException(
317                                        "Message listener needs to be of type [" + MessageListener.class.getName() +
318                                        "] or [" + SessionAwareMessageListener.class.getName() + "]");
319                }
320        }
321
322        /**
323         * Determine the default subscription name for the given message listener.
324         * @param messageListener the message listener object to check
325         * @return the default subscription name
326         * @see SubscriptionNameProvider
327         */
328        protected String getDefaultSubscriptionName(Object messageListener) {
329                if (messageListener instanceof SubscriptionNameProvider) {
330                        return ((SubscriptionNameProvider) messageListener).getSubscriptionName();
331                }
332                else {
333                        return messageListener.getClass().getName();
334                }
335        }
336
337        /**
338         * Set whether to make the subscription durable. The durable subscription name
339         * to be used can be specified through the "subscriptionName" property.
340         * <p>Default is "false". Set this to "true" to register a durable subscription,
341         * typically in combination with a "subscriptionName" value (unless
342         * your message listener class name is good enough as subscription name).
343         * <p>Only makes sense when listening to a topic (pub-sub domain),
344         * therefore this method switches the "pubSubDomain" flag as well.
345         * @see #setSubscriptionName
346         * @see #setPubSubDomain
347         */
348        public void setSubscriptionDurable(boolean subscriptionDurable) {
349                this.subscriptionDurable = subscriptionDurable;
350                if (subscriptionDurable) {
351                        setPubSubDomain(true);
352                }
353        }
354
355        /**
356         * Return whether to make the subscription durable.
357         */
358        public boolean isSubscriptionDurable() {
359                return this.subscriptionDurable;
360        }
361
362        /**
363         * Set whether to make the subscription shared. The shared subscription name
364         * to be used can be specified through the "subscriptionName" property.
365         * <p>Default is "false". Set this to "true" to register a shared subscription,
366         * typically in combination with a "subscriptionName" value (unless
367         * your message listener class name is good enough as subscription name).
368         * Note that shared subscriptions may also be durable, so this flag can
369         * (and often will) be combined with "subscriptionDurable" as well.
370         * <p>Only makes sense when listening to a topic (pub-sub domain),
371         * therefore this method switches the "pubSubDomain" flag as well.
372         * <p><b>Requires a JMS 2.0 compatible message broker.</b>
373         * @since 4.1
374         * @see #setSubscriptionName
375         * @see #setSubscriptionDurable
376         * @see #setPubSubDomain
377         */
378        public void setSubscriptionShared(boolean subscriptionShared) {
379                this.subscriptionShared = subscriptionShared;
380                if (subscriptionShared) {
381                        setPubSubDomain(true);
382                }
383        }
384
385        /**
386         * Return whether to make the subscription shared.
387         * @since 4.1
388         */
389        public boolean isSubscriptionShared() {
390                return this.subscriptionShared;
391        }
392
393        /**
394         * Set the name of a subscription to create. To be applied in case
395         * of a topic (pub-sub domain) with a shared or durable subscription.
396         * <p>The subscription name needs to be unique within this client's
397         * JMS client id. Default is the class name of the specified message listener.
398         * <p>Note: Only 1 concurrent consumer (which is the default of this
399         * message listener container) is allowed for each subscription,
400         * except for a shared subscription (which requires JMS 2.0).
401         * @since 4.1
402         * @see #setPubSubDomain
403         * @see #setSubscriptionDurable
404         * @see #setSubscriptionShared
405         * @see #setClientId
406         * @see #setMessageListener
407         */
408        public void setSubscriptionName(@Nullable String subscriptionName) {
409                this.subscriptionName = subscriptionName;
410        }
411
412        /**
413         * Return the name of a subscription to create, if any.
414         * @since 4.1
415         */
416        @Nullable
417        public String getSubscriptionName() {
418                return this.subscriptionName;
419        }
420
421        /**
422         * Set the name of a durable subscription to create. This method switches
423         * to pub-sub domain mode and activates subscription durability as well.
424         * <p>The durable subscription name needs to be unique within this client's
425         * JMS client id. Default is the class name of the specified message listener.
426         * <p>Note: Only 1 concurrent consumer (which is the default of this
427         * message listener container) is allowed for each durable subscription,
428         * except for a shared durable subscription (which requires JMS 2.0).
429         * @see #setPubSubDomain
430         * @see #setSubscriptionDurable
431         * @see #setSubscriptionShared
432         * @see #setClientId
433         * @see #setMessageListener
434         */
435        public void setDurableSubscriptionName(@Nullable String durableSubscriptionName) {
436                this.subscriptionName = durableSubscriptionName;
437                this.subscriptionDurable = (durableSubscriptionName != null);
438        }
439
440        /**
441         * Return the name of a durable subscription to create, if any.
442         */
443        @Nullable
444        public String getDurableSubscriptionName() {
445                return (this.subscriptionDurable ? this.subscriptionName : null);
446        }
447
448        /**
449         * Set whether to inhibit the delivery of messages published by its own connection.
450         * Default is "false".
451         * @since 4.1
452         * @see javax.jms.Session#createConsumer(javax.jms.Destination, String, boolean)
453         */
454        public void setPubSubNoLocal(boolean pubSubNoLocal) {
455                this.pubSubNoLocal = pubSubNoLocal;
456        }
457
458        /**
459         * Return whether to inhibit the delivery of messages published by its own connection.
460         * @since 4.1
461         */
462        public boolean isPubSubNoLocal() {
463                return this.pubSubNoLocal;
464        }
465
466        /**
467         * Configure the reply destination type. By default, the configured {@code pubSubDomain}
468         * value is used (see {@link #isPubSubDomain()}.
469         * <p>This setting primarily indicates what type of destination to resolve if dynamic
470         * destinations are enabled.
471         * @param replyPubSubDomain "true" for the Publish/Subscribe domain ({@link Topic Topics}),
472         * "false" for the Point-to-Point domain ({@link Queue Queues})
473         * @since 4.2
474         * @see #setDestinationResolver
475         */
476        public void setReplyPubSubDomain(boolean replyPubSubDomain) {
477                this.replyPubSubDomain = replyPubSubDomain;
478        }
479
480        /**
481         * Return whether the Publish/Subscribe domain ({@link javax.jms.Topic Topics}) is used
482         * for replies. Otherwise, the Point-to-Point domain ({@link javax.jms.Queue Queues})
483         * is used.
484         * @since 4.2
485         */
486        @Override
487        public boolean isReplyPubSubDomain() {
488                if (this.replyPubSubDomain != null) {
489                        return this.replyPubSubDomain;
490                }
491                else {
492                        return isPubSubDomain();
493                }
494        }
495
496        /**
497         * Configure the {@link QosSettings} to use when sending a reply. Can be set to
498         * {@code null} to indicate that the broker's defaults should be used.
499         * @param replyQosSettings the QoS settings to use when sending a reply or {@code null}
500         * to use the default vas.
501         * @since 5.0
502         */
503        public void setReplyQosSettings(@Nullable QosSettings replyQosSettings) {
504                this.replyQosSettings = replyQosSettings;
505        }
506
507        @Override
508        @Nullable
509        public QosSettings getReplyQosSettings() {
510                return this.replyQosSettings;
511        }
512
513        /**
514         * Set the {@link MessageConverter} strategy for converting JMS Messages.
515         * @since 4.1
516         */
517        public void setMessageConverter(@Nullable MessageConverter messageConverter) {
518                this.messageConverter = messageConverter;
519        }
520
521        @Override
522        @Nullable
523        public MessageConverter getMessageConverter() {
524                return this.messageConverter;
525        }
526
527        /**
528         * Set the JMS ExceptionListener to notify in case of a JMSException thrown
529         * by the registered message listener or the invocation infrastructure.
530         */
531        public void setExceptionListener(@Nullable ExceptionListener exceptionListener) {
532                this.exceptionListener = exceptionListener;
533        }
534
535        /**
536         * Return the JMS ExceptionListener to notify in case of a JMSException thrown
537         * by the registered message listener or the invocation infrastructure, if any.
538         */
539        @Nullable
540        public ExceptionListener getExceptionListener() {
541                return this.exceptionListener;
542        }
543
544        /**
545         * Set the ErrorHandler to be invoked in case of any uncaught exceptions thrown
546         * while processing a Message.
547         * <p>By default, there will be <b>no</b> ErrorHandler so that error-level
548         * logging is the only result.
549         */
550        public void setErrorHandler(@Nullable ErrorHandler errorHandler) {
551                this.errorHandler = errorHandler;
552        }
553
554        /**
555         * Return the ErrorHandler to be invoked in case of any uncaught exceptions thrown
556         * while processing a Message.
557         * @since 4.1
558         */
559        @Nullable
560        public ErrorHandler getErrorHandler() {
561                return this.errorHandler;
562        }
563
564        /**
565         * Set whether to expose the listener JMS Session to a registered
566         * {@link SessionAwareMessageListener} as well as to
567         * {@link org.springframework.jms.core.JmsTemplate} calls.
568         * <p>Default is "true", reusing the listener's {@link Session}.
569         * Turn this off to expose a fresh JMS Session fetched from the same
570         * underlying JMS {@link Connection} instead, which might be necessary
571         * on some JMS providers.
572         * <p>Note that Sessions managed by an external transaction manager will
573         * always get exposed to {@link org.springframework.jms.core.JmsTemplate}
574         * calls. So in terms of JmsTemplate exposure, this setting only affects
575         * locally transacted Sessions.
576         * @see SessionAwareMessageListener
577         */
578        public void setExposeListenerSession(boolean exposeListenerSession) {
579                this.exposeListenerSession = exposeListenerSession;
580        }
581
582        /**
583         * Return whether to expose the listener JMS {@link Session} to a
584         * registered {@link SessionAwareMessageListener}.
585         */
586        public boolean isExposeListenerSession() {
587                return this.exposeListenerSession;
588        }
589
590        /**
591         * Set whether to accept received messages while the listener container
592         * in the process of stopping.
593         * <p>Default is "false", rejecting such messages through aborting the
594         * receive attempt. Switch this flag on to fully process such messages
595         * even in the stopping phase, with the drawback that even newly sent
596         * messages might still get processed (if coming in before all receive
597         * timeouts have expired).
598         * <p><b>NOTE:</b> Aborting receive attempts for such incoming messages
599         * might lead to the provider's retry count decreasing for the affected
600         * messages. If you have a high number of concurrent consumers, make sure
601         * that the number of retries is higher than the number of consumers,
602         * to be on the safe side for all potential stopping scenarios.
603         */
604        public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
605                this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
606        }
607
608        /**
609         * Return whether to accept received messages while the listener container
610         * in the process of stopping.
611         */
612        public boolean isAcceptMessagesWhileStopping() {
613                return this.acceptMessagesWhileStopping;
614        }
615
616        @Override
617        protected void validateConfiguration() {
618                if (this.destination == null) {
619                        throw new IllegalArgumentException("Property 'destination' or 'destinationName' is required");
620                }
621        }
622
623        @Override
624        public void setupMessageListener(Object messageListener) {
625                setMessageListener(messageListener);
626        }
627
628
629        //-------------------------------------------------------------------------
630        // Template methods for listener execution
631        //-------------------------------------------------------------------------
632
633        /**
634         * Execute the specified listener,
635         * committing or rolling back the transaction afterwards (if necessary).
636         * @param session the JMS Session to operate on
637         * @param message the received JMS Message
638         * @see #invokeListener
639         * @see #commitIfNecessary
640         * @see #rollbackOnExceptionIfNecessary
641         * @see #handleListenerException
642         */
643        protected void executeListener(Session session, Message message) {
644                try {
645                        doExecuteListener(session, message);
646                }
647                catch (Throwable ex) {
648                        handleListenerException(ex);
649                }
650        }
651
652        /**
653         * Execute the specified listener,
654         * committing or rolling back the transaction afterwards (if necessary).
655         * @param session the JMS Session to operate on
656         * @param message the received JMS Message
657         * @throws JMSException if thrown by JMS API methods
658         * @see #invokeListener
659         * @see #commitIfNecessary
660         * @see #rollbackOnExceptionIfNecessary
661         * @see #convertJmsAccessException
662         */
663        protected void doExecuteListener(Session session, Message message) throws JMSException {
664                if (!isAcceptMessagesWhileStopping() && !isRunning()) {
665                        if (logger.isWarnEnabled()) {
666                                logger.warn("Rejecting received message because of the listener container " +
667                                                "having been stopped in the meantime: " + message);
668                        }
669                        rollbackIfNecessary(session);
670                        throw new MessageRejectedWhileStoppingException();
671                }
672
673                try {
674                        invokeListener(session, message);
675                }
676                catch (JMSException | RuntimeException | Error ex) {
677                        rollbackOnExceptionIfNecessary(session, ex);
678                        throw ex;
679                }
680                commitIfNecessary(session, message);
681        }
682
683        /**
684         * Invoke the specified listener: either as standard JMS MessageListener
685         * or (preferably) as Spring SessionAwareMessageListener.
686         * @param session the JMS Session to operate on
687         * @param message the received JMS Message
688         * @throws JMSException if thrown by JMS API methods
689         * @see #setMessageListener
690         */
691        @SuppressWarnings("rawtypes")
692        protected void invokeListener(Session session, Message message) throws JMSException {
693                Object listener = getMessageListener();
694
695                if (listener instanceof SessionAwareMessageListener) {
696                        doInvokeListener((SessionAwareMessageListener) listener, session, message);
697                }
698                else if (listener instanceof MessageListener) {
699                        doInvokeListener((MessageListener) listener, message);
700                }
701                else if (listener != null) {
702                        throw new IllegalArgumentException(
703                                        "Only MessageListener and SessionAwareMessageListener supported: " + listener);
704                }
705                else {
706                        throw new IllegalStateException("No message listener specified - see property 'messageListener'");
707                }
708        }
709
710        /**
711         * Invoke the specified listener as Spring SessionAwareMessageListener,
712         * exposing a new JMS Session (potentially with its own transaction)
713         * to the listener if demanded.
714         * @param listener the Spring SessionAwareMessageListener to invoke
715         * @param session the JMS Session to operate on
716         * @param message the received JMS Message
717         * @throws JMSException if thrown by JMS API methods
718         * @see SessionAwareMessageListener
719         * @see #setExposeListenerSession
720         */
721        @SuppressWarnings({"rawtypes", "unchecked"})
722        protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message)
723                        throws JMSException {
724
725                Connection conToClose = null;
726                Session sessionToClose = null;
727                try {
728                        Session sessionToUse = session;
729                        if (!isExposeListenerSession()) {
730                                // We need to expose a separate Session.
731                                conToClose = createConnection();
732                                sessionToClose = createSession(conToClose);
733                                sessionToUse = sessionToClose;
734                        }
735                        // Actually invoke the message listener...
736                        listener.onMessage(message, sessionToUse);
737                        // Clean up specially exposed Session, if any.
738                        if (sessionToUse != session) {
739                                if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) {
740                                        // Transacted session created by this container -> commit.
741                                        JmsUtils.commitIfNecessary(sessionToUse);
742                                }
743                        }
744                }
745                finally {
746                        JmsUtils.closeSession(sessionToClose);
747                        JmsUtils.closeConnection(conToClose);
748                }
749        }
750
751        /**
752         * Invoke the specified listener as standard JMS MessageListener.
753         * <p>Default implementation performs a plain invocation of the
754         * {@code onMessage} method.
755         * @param listener the JMS MessageListener to invoke
756         * @param message the received JMS Message
757         * @throws JMSException if thrown by JMS API methods
758         * @see javax.jms.MessageListener#onMessage
759         */
760        protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
761                listener.onMessage(message);
762        }
763
764        /**
765         * Perform a commit or message acknowledgement, as appropriate.
766         * @param session the JMS Session to commit
767         * @param message the Message to acknowledge
768         * @throws javax.jms.JMSException in case of commit failure
769         */
770        protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException {
771                // Commit session or acknowledge message.
772                if (session.getTransacted()) {
773                        // Commit necessary - but avoid commit call within a JTA transaction.
774                        if (isSessionLocallyTransacted(session)) {
775                                // Transacted session created by this container -> commit.
776                                JmsUtils.commitIfNecessary(session);
777                        }
778                }
779                else if (message != null && isClientAcknowledge(session)) {
780                        message.acknowledge();
781                }
782        }
783
784        /**
785         * Perform a rollback, if appropriate.
786         * @param session the JMS Session to rollback
787         * @throws javax.jms.JMSException in case of a rollback error
788         */
789        protected void rollbackIfNecessary(Session session) throws JMSException {
790                if (session.getTransacted()) {
791                        if (isSessionLocallyTransacted(session)) {
792                                // Transacted session created by this container -> rollback.
793                                JmsUtils.rollbackIfNecessary(session);
794                        }
795                }
796                else if (isClientAcknowledge(session)) {
797                        session.recover();
798                }
799        }
800
801        /**
802         * Perform a rollback, handling rollback exceptions properly.
803         * @param session the JMS Session to rollback
804         * @param ex the thrown application exception or error
805         * @throws javax.jms.JMSException in case of a rollback error
806         */
807        protected void rollbackOnExceptionIfNecessary(Session session, Throwable ex) throws JMSException {
808                try {
809                        if (session.getTransacted()) {
810                                if (isSessionLocallyTransacted(session)) {
811                                        // Transacted session created by this container -> rollback.
812                                        if (logger.isDebugEnabled()) {
813                                                logger.debug("Initiating transaction rollback on application exception", ex);
814                                        }
815                                        JmsUtils.rollbackIfNecessary(session);
816                                }
817                        }
818                        else if (isClientAcknowledge(session)) {
819                                session.recover();
820                        }
821                }
822                catch (IllegalStateException ex2) {
823                        logger.debug("Could not roll back because Session already closed", ex2);
824                }
825                catch (JMSException | RuntimeException | Error ex2) {
826                        logger.error("Application exception overridden by rollback error", ex);
827                        throw ex2;
828                }
829        }
830
831        /**
832         * Check whether the given Session is locally transacted, that is, whether
833         * its transaction is managed by this listener container's Session handling
834         * and not by an external transaction coordinator.
835         * <p>Note: The Session's own transacted flag will already have been checked
836         * before. This method is about finding out whether the Session's transaction
837         * is local or externally coordinated.
838         * @param session the Session to check
839         * @return whether the given Session is locally transacted
840         * @see #isSessionTransacted()
841         * @see org.springframework.jms.connection.ConnectionFactoryUtils#isSessionTransactional
842         */
843        protected boolean isSessionLocallyTransacted(Session session) {
844                return isSessionTransacted();
845        }
846
847        /**
848         * Create a JMS MessageConsumer for the given Session and Destination.
849         * <p>This implementation uses JMS 1.1 API.
850         * @param session the JMS Session to create a MessageConsumer for
851         * @param destination the JMS Destination to create a MessageConsumer for
852         * @return the new JMS MessageConsumer
853         * @throws javax.jms.JMSException if thrown by JMS API methods
854         */
855        protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
856                if (isPubSubDomain() && destination instanceof Topic) {
857                        if (isSubscriptionShared()) {
858                                return (isSubscriptionDurable() ?
859                                                session.createSharedDurableConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()) :
860                                                session.createSharedConsumer((Topic) destination, getSubscriptionName(), getMessageSelector()));
861                        }
862                        else if (isSubscriptionDurable()) {
863                                return session.createDurableSubscriber(
864                                                (Topic) destination, getSubscriptionName(), getMessageSelector(), isPubSubNoLocal());
865                        }
866                        else {
867                                // Only pass in the NoLocal flag in case of a Topic (pub-sub mode):
868                                // Some JMS providers, such as WebSphere MQ 6.0, throw IllegalStateException
869                                // in case of the NoLocal flag being specified for a Queue.
870                                return session.createConsumer(destination, getMessageSelector(), isPubSubNoLocal());
871                        }
872                }
873                else {
874                        return session.createConsumer(destination, getMessageSelector());
875                }
876        }
877
878        /**
879         * Handle the given exception that arose during listener execution.
880         * <p>The default implementation logs the exception at warn level,
881         * not propagating it to the JMS provider &mdash; assuming that all handling of
882         * acknowledgement and/or transactions is done by this listener container.
883         * This can be overridden in subclasses.
884         * @param ex the exception to handle
885         */
886        protected void handleListenerException(Throwable ex) {
887                if (ex instanceof MessageRejectedWhileStoppingException) {
888                        // Internal exception - has been handled before.
889                        return;
890                }
891                if (ex instanceof JMSException) {
892                        invokeExceptionListener((JMSException) ex);
893                }
894                if (isActive()) {
895                        // Regular case: failed while active.
896                        // Invoke ErrorHandler if available.
897                        invokeErrorHandler(ex);
898                }
899                else {
900                        // Rare case: listener thread failed after container shutdown.
901                        // Log at debug level, to avoid spamming the shutdown log.
902                        logger.debug("Listener exception after container shutdown", ex);
903                }
904        }
905
906        /**
907         * Invoke the registered JMS ExceptionListener, if any.
908         * @param ex the exception that arose during JMS processing
909         * @see #setExceptionListener
910         */
911        protected void invokeExceptionListener(JMSException ex) {
912                ExceptionListener exceptionListener = getExceptionListener();
913                if (exceptionListener != null) {
914                        exceptionListener.onException(ex);
915                }
916        }
917
918        /**
919         * Invoke the registered ErrorHandler, if any. Log at warn level otherwise.
920         * @param ex the uncaught error that arose during JMS processing.
921         * @see #setErrorHandler
922         */
923        protected void invokeErrorHandler(Throwable ex) {
924                ErrorHandler errorHandler = getErrorHandler();
925                if (errorHandler != null) {
926                        errorHandler.handleError(ex);
927                }
928                else {
929                        logger.warn("Execution of JMS message listener failed, and no ErrorHandler has been set.", ex);
930                }
931        }
932
933
934        /**
935         * Internal exception class that indicates a rejected message on shutdown.
936         * Used to trigger a rollback for an external transaction manager in that case.
937         */
938        @SuppressWarnings("serial")
939        private static class MessageRejectedWhileStoppingException extends RuntimeException {
940        }
941
942}