001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.util.HashSet;
020import java.util.Set;
021import java.util.concurrent.Executor;
022
023import javax.jms.Connection;
024import javax.jms.JMSException;
025import javax.jms.MessageConsumer;
026import javax.jms.Session;
027
028import org.springframework.core.Constants;
029import org.springframework.core.task.SimpleAsyncTaskExecutor;
030import org.springframework.core.task.TaskExecutor;
031import org.springframework.jms.JmsException;
032import org.springframework.jms.support.JmsUtils;
033import org.springframework.jms.support.destination.CachingDestinationResolver;
034import org.springframework.jms.support.destination.DestinationResolver;
035import org.springframework.lang.Nullable;
036import org.springframework.scheduling.SchedulingAwareRunnable;
037import org.springframework.scheduling.SchedulingTaskExecutor;
038import org.springframework.util.Assert;
039import org.springframework.util.ClassUtils;
040import org.springframework.util.backoff.BackOff;
041import org.springframework.util.backoff.BackOffExecution;
042import org.springframework.util.backoff.FixedBackOff;
043
044/**
045 * Message listener container variant that uses plain JMS client APIs, specifically
046 * a loop of {@code MessageConsumer.receive()} calls that also allow for
047 * transactional reception of messages (registering them with XA transactions).
048 * Designed to work in a native JMS environment as well as in a Java EE environment,
049 * with only minimal differences in configuration.
050 *
051 * <p>This is a simple but nevertheless powerful form of message listener container.
052 * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
053 * and optionally allows for dynamic adaptation at runtime (up to a maximum number).
054 * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
055 * of runtime complexity, in particular the minimal requirements on the JMS provider:
056 * not even the JMS {@code ServerSessionPool} facility is required. Beyond that, it is
057 * fully self-recovering in case the broker is temporarily unavailable, and allows
058 * for stops/restarts as well as runtime changes to its configuration.
059 *
060 * <p>Actual {@code MessageListener} execution happens in asynchronous work units which are
061 * created through Spring's {@link org.springframework.core.task.TaskExecutor TaskExecutor}
062 * abstraction. By default, the specified number of invoker tasks will be created
063 * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
064 * setting. Specify an alternative {@code TaskExecutor} to integrate with an existing
065 * thread pool facility (such as a Java EE server's), for example using a
066 * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
067 * With a native JMS setup, each of those listener threads is going to use a
068 * cached JMS {@code Session} and {@code MessageConsumer} (only refreshed in case
069 * of failure), using the JMS provider's resources as efficiently as possible.
070 *
071 * <p>Message reception and listener execution can automatically be wrapped
072 * in transactions by passing a Spring
073 * {@link org.springframework.transaction.PlatformTransactionManager} into the
074 * {@link #setTransactionManager "transactionManager"} property. This will usually
075 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
076 * Java EE environment, in combination with a JTA-aware JMS {@code ConnectionFactory}
077 * obtained from JNDI (check your Java EE server's documentation). Note that this
078 * listener container will automatically reobtain all JMS handles for each transaction
079 * in case an external transaction manager is specified, for compatibility with
080 * all Java EE servers (in particular JBoss). This non-caching behavior can be
081 * overridden through the {@link #setCacheLevel "cacheLevel"} /
082 * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching of
083 * the {@code Connection} (or also {@code Session} and {@code MessageConsumer})
084 * even if an external transaction manager is involved.
085 *
086 * <p>Dynamic scaling of the number of concurrent invokers can be activated
087 * by specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
088 * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
089 * value. Since the latter's default is 1, you can also simply specify a
090 * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
091 * 5 concurrent consumers in case of increasing message load, as well as dynamic
092 * shrinking back to the standard number of consumers once the load decreases.
093 * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
094 * setting to control the lifespan of each new task, to avoid frequent scaling up
095 * and down, in particular if the {@code ConnectionFactory} does not pool JMS
096 * {@code Sessions} and/or the {@code TaskExecutor} does not pool threads (check
097 * your configuration!). Note that dynamic scaling only really makes sense for a
098 * queue in the first place; for a topic, you will typically stick with the default
099 * number of 1 consumer, otherwise you'd receive the same message multiple times on
100 * the same node.
101 *
102 * <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
103 * in combination with dynamic scaling.</b> Ideally, don't use it with a message
104 * listener container at all, since it is generally preferable to let the
105 * listener container itself handle appropriate caching within its lifecycle.
106 * Also, stopping and restarting a listener container will only work with an
107 * independent, locally cached Connection - not with an externally cached one.
108 *
109 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
110 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
111 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
112 * javadoc for details on acknowledge modes and native transaction options, as
113 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
114 * on configuring an external transaction manager. Note that for the default
115 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
116 * before listener execution, with no redelivery in case of an exception.
117 *
118 * @author Juergen Hoeller
119 * @since 2.0
120 * @see #setTransactionManager
121 * @see #setCacheLevel
122 * @see javax.jms.MessageConsumer#receive(long)
123 * @see SimpleMessageListenerContainer
124 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
125 */
126public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
127
128        /**
129         * Default thread name prefix: "DefaultMessageListenerContainer-".
130         */
131        public static final String DEFAULT_THREAD_NAME_PREFIX =
132                        ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
133
134        /**
135         * The default recovery interval: 5000 ms = 5 seconds.
136         */
137        public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
138
139
140        /**
141         * Constant that indicates to cache no JMS resources at all.
142         * @see #setCacheLevel
143         */
144        public static final int CACHE_NONE = 0;
145
146        /**
147         * Constant that indicates to cache a shared JMS {@code Connection} for each
148         * listener thread.
149         * @see #setCacheLevel
150         */
151        public static final int CACHE_CONNECTION = 1;
152
153        /**
154         * Constant that indicates to cache a shared JMS {@code Connection} and a JMS
155         * {@code Session} for each listener thread.
156         * @see #setCacheLevel
157         */
158        public static final int CACHE_SESSION = 2;
159
160        /**
161         * Constant that indicates to cache a shared JMS {@code Connection}, a JMS
162         * {@code Session}, and a JMS MessageConsumer for each listener thread.
163         * @see #setCacheLevel
164         */
165        public static final int CACHE_CONSUMER = 3;
166
167        /**
168         * Constant that indicates automatic choice of an appropriate caching level
169         * (depending on the transaction management strategy).
170         * @see #setCacheLevel
171         */
172        public static final int CACHE_AUTO = 4;
173
174
175        private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
176
177
178        @Nullable
179        private Executor taskExecutor;
180
181        private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);
182
183        private int cacheLevel = CACHE_AUTO;
184
185        private int concurrentConsumers = 1;
186
187        private int maxConcurrentConsumers = 1;
188
189        private int maxMessagesPerTask = Integer.MIN_VALUE;
190
191        private int idleConsumerLimit = 1;
192
193        private int idleTaskExecutionLimit = 1;
194
195        private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<>();
196
197        private int activeInvokerCount = 0;
198
199        private int registeredWithDestination = 0;
200
201        private volatile boolean recovering = false;
202
203        private volatile boolean interrupted = false;
204
205        @Nullable
206        private Runnable stopCallback;
207
208        private Object currentRecoveryMarker = new Object();
209
210        private final Object recoveryMonitor = new Object();
211
212
213        /**
214         * Set the Spring {@code TaskExecutor} to use for running the listener threads.
215         * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
216         * starting up a number of new threads, according to the specified number
217         * of concurrent consumers.
218         * <p>Specify an alternative {@code TaskExecutor} for integration with an existing
219         * thread pool. Note that this really only adds value if the threads are
220         * managed in a specific fashion, for example within a Java EE environment.
221         * A plain thread pool does not add much value, as this listener container
222         * will occupy a number of threads for its entire lifetime.
223         * @see #setConcurrentConsumers
224         * @see org.springframework.core.task.SimpleAsyncTaskExecutor
225         * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
226         */
227        public void setTaskExecutor(Executor taskExecutor) {
228                this.taskExecutor = taskExecutor;
229        }
230
231        /**
232         * Specify the {@link BackOff} instance to use to compute the interval
233         * between recovery attempts. If the {@link BackOffExecution} implementation
234         * returns {@link BackOffExecution#STOP}, this listener container will not further
235         * attempt to recover.
236         * <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored
237         * when this property is set.
238         * @since 4.1
239         */
240        public void setBackOff(BackOff backOff) {
241                this.backOff = backOff;
242        }
243
244        /**
245         * Specify the interval between recovery attempts, in <b>milliseconds</b>.
246         * The default is 5000 ms, that is, 5 seconds. This is a convenience method
247         * to create a {@link FixedBackOff} with the specified interval.
248         * <p>For more recovery options, consider specifying a {@link BackOff}
249         * instance instead.
250         * @see #setBackOff(BackOff)
251         * @see #handleListenerSetupFailure
252         */
253        public void setRecoveryInterval(long recoveryInterval) {
254                this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE);
255        }
256
257        /**
258         * Specify the level of caching that this listener container is allowed to apply,
259         * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
260         * @see #setCacheLevel
261         */
262        public void setCacheLevelName(String constantName) throws IllegalArgumentException {
263                if (!constantName.startsWith("CACHE_")) {
264                        throw new IllegalArgumentException("Only cache constants allowed");
265                }
266                setCacheLevel(constants.asNumber(constantName).intValue());
267        }
268
269        /**
270         * Specify the level of caching that this listener container is allowed to apply.
271         * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified
272         * (to reobtain all resources freshly within the scope of the external transaction),
273         * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources).
274         * <p>Some Java EE servers only register their JMS resources with an ongoing XA
275         * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session},
276         * which is why this listener container by default does not cache any of those.
277         * However, depending on the rules of your server with respect to the caching
278         * of transactional resources, consider switching this setting to at least
279         * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an
280         * external transaction manager.
281         * @see #CACHE_NONE
282         * @see #CACHE_CONNECTION
283         * @see #CACHE_SESSION
284         * @see #CACHE_CONSUMER
285         * @see #setCacheLevelName
286         * @see #setTransactionManager
287         */
288        public void setCacheLevel(int cacheLevel) {
289                this.cacheLevel = cacheLevel;
290        }
291
292        /**
293         * Return the level of caching that this listener container is allowed to apply.
294         */
295        public int getCacheLevel() {
296                return this.cacheLevel;
297        }
298
299
300        /**
301         * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
302         * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
303         * <p>This listener container will always hold on to the minimum number of consumers
304         * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
305         * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
306         */
307        @Override
308        public void setConcurrency(String concurrency) {
309                try {
310                        int separatorIndex = concurrency.indexOf('-');
311                        if (separatorIndex != -1) {
312                                setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
313                                setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
314                        }
315                        else {
316                                setConcurrentConsumers(1);
317                                setMaxConcurrentConsumers(Integer.parseInt(concurrency));
318                        }
319                }
320                catch (NumberFormatException ex) {
321                        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
322                                        "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
323                }
324        }
325
326        /**
327         * Specify the number of concurrent consumers to create. Default is 1.
328         * <p>Specifying a higher value for this setting will increase the standard
329         * level of scheduled concurrent consumers at runtime: This is effectively
330         * the minimum number of concurrent consumers which will be scheduled
331         * at any given time. This is a static setting; for dynamic scaling,
332         * consider specifying the "maxConcurrentConsumers" setting instead.
333         * <p>Raising the number of concurrent consumers is recommendable in order
334         * to scale the consumption of messages coming in from a queue. However,
335         * note that any ordering guarantees are lost once multiple consumers are
336         * registered. In general, stick with 1 consumer for low-volume queues.
337         * <p><b>Do not raise the number of concurrent consumers for a topic,
338         * unless vendor-specific setup measures clearly allow for it.</b>
339         * With regular setup, this would lead to concurrent consumption
340         * of the same message, which is hardly ever desirable.
341         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
342         * @see #setMaxConcurrentConsumers
343         */
344        public void setConcurrentConsumers(int concurrentConsumers) {
345                Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
346                synchronized (this.lifecycleMonitor) {
347                        this.concurrentConsumers = concurrentConsumers;
348                        if (this.maxConcurrentConsumers < concurrentConsumers) {
349                                this.maxConcurrentConsumers = concurrentConsumers;
350                        }
351                }
352        }
353
354        /**
355         * Return the "concurrentConsumer" setting.
356         * <p>This returns the currently configured "concurrentConsumers" value;
357         * the number of currently scheduled/active consumers might differ.
358         * @see #getScheduledConsumerCount()
359         * @see #getActiveConsumerCount()
360         */
361        public final int getConcurrentConsumers() {
362                synchronized (this.lifecycleMonitor) {
363                        return this.concurrentConsumers;
364                }
365        }
366
367        /**
368         * Specify the maximum number of concurrent consumers to create. Default is 1.
369         * <p>If this setting is higher than "concurrentConsumers", the listener container
370         * will dynamically schedule new consumers at runtime, provided that enough
371         * incoming messages are encountered. Once the load goes down again, the number of
372         * consumers will be reduced to the standard level ("concurrentConsumers") again.
373         * <p>Raising the number of concurrent consumers is recommendable in order
374         * to scale the consumption of messages coming in from a queue. However,
375         * note that any ordering guarantees are lost once multiple consumers are
376         * registered. In general, stick with 1 consumer for low-volume queues.
377         * <p><b>Do not raise the number of concurrent consumers for a topic,
378         * unless vendor-specific setup measures clearly allow for it.</b>
379         * With regular setup, this would lead to concurrent consumption
380         * of the same message, which is hardly ever desirable.
381         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
382         * @see #setConcurrentConsumers
383         */
384        public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
385                Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
386                synchronized (this.lifecycleMonitor) {
387                        this.maxConcurrentConsumers = Math.max(maxConcurrentConsumers, this.concurrentConsumers);
388                }
389        }
390
391        /**
392         * Return the "maxConcurrentConsumer" setting.
393         * <p>This returns the currently configured "maxConcurrentConsumers" value;
394         * the number of currently scheduled/active consumers might differ.
395         * @see #getScheduledConsumerCount()
396         * @see #getActiveConsumerCount()
397         */
398        public final int getMaxConcurrentConsumers() {
399                synchronized (this.lifecycleMonitor) {
400                        return this.maxConcurrentConsumers;
401                }
402        }
403
404        /**
405         * Specify the maximum number of messages to process in one task.
406         * More concretely, this limits the number of message reception attempts
407         * per task, which includes receive iterations that did not actually
408         * pick up a message until they hit their timeout (see the
409         * {@link #setReceiveTimeout "receiveTimeout"} property).
410         * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
411         * reusing the original invoker threads until shutdown (at the
412         * expense of limited dynamic scheduling).
413         * <p>In case of a SchedulingTaskExecutor indicating a preference for
414         * short-lived tasks, the default is 10 instead. Specify a number
415         * of 10 to 100 messages to balance between rather long-lived and
416         * rather short-lived tasks here.
417         * <p>Long-lived tasks avoid frequent thread context switches through
418         * sticking with the same thread all the way through, while short-lived
419         * tasks allow thread pools to control the scheduling. Hence, thread
420         * pools will usually prefer short-lived tasks.
421         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
422         * @see #setTaskExecutor
423         * @see #setReceiveTimeout
424         * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
425         */
426        public void setMaxMessagesPerTask(int maxMessagesPerTask) {
427                Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
428                synchronized (this.lifecycleMonitor) {
429                        this.maxMessagesPerTask = maxMessagesPerTask;
430                }
431        }
432
433        /**
434         * Return the maximum number of messages to process in one task.
435         */
436        public final int getMaxMessagesPerTask() {
437                synchronized (this.lifecycleMonitor) {
438                        return this.maxMessagesPerTask;
439                }
440        }
441
442        /**
443         * Specify the limit for the number of consumers that are allowed to be idle
444         * at any given time.
445         * <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method
446         * to determine if a new invoker should be created. Increasing the limit causes
447         * invokers to be created more aggressively. This can be useful to ramp up the
448         * number of invokers faster.
449         * <p>The default is 1, only scheduling a new invoker (which is likely to
450         * be idle initially) if none of the existing invokers is currently idle.
451         */
452        public void setIdleConsumerLimit(int idleConsumerLimit) {
453                Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher");
454                synchronized (this.lifecycleMonitor) {
455                        this.idleConsumerLimit = idleConsumerLimit;
456                }
457        }
458
459        /**
460         * Return the limit for the number of idle consumers.
461         */
462        public final int getIdleConsumerLimit() {
463                synchronized (this.lifecycleMonitor) {
464                        return this.idleConsumerLimit;
465                }
466        }
467
468        /**
469         * Specify the limit for idle executions of a consumer task, not having
470         * received any message within its execution. If this limit is reached,
471         * the task will shut down and leave receiving to other executing tasks.
472         * <p>The default is 1, closing idle resources early once a task didn't
473         * receive a message. This applies to dynamic scheduling only; see the
474         * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting.
475         * The minimum number of consumers
476         * (see {@link #setConcurrentConsumers "concurrentConsumers"})
477         * will be kept around until shutdown in any case.
478         * <p>Within each task execution, a number of message reception attempts
479         * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
480         * message (according to the "receiveTimeout" setting). If all of those receive
481         * attempts in a given task return without a message, the task is considered
482         * idle with respect to received messages. Such a task may still be rescheduled;
483         * however, once it reached the specified "idleTaskExecutionLimit", it will
484         * shut down (in case of dynamic scaling).
485         * <p>Raise this limit if you encounter too frequent scaling up and down.
486         * With this limit being higher, an idle consumer will be kept around longer,
487         * avoiding the restart of a consumer once a new load of messages comes in.
488         * Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value,
489         * which will also lead to idle consumers being kept around for a longer time
490         * (while also increasing the average execution time of each scheduled task).
491         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
492         * @see #setMaxMessagesPerTask
493         * @see #setReceiveTimeout
494         */
495        public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
496                Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
497                synchronized (this.lifecycleMonitor) {
498                        this.idleTaskExecutionLimit = idleTaskExecutionLimit;
499                }
500        }
501
502        /**
503         * Return the limit for idle executions of a consumer task.
504         */
505        public final int getIdleTaskExecutionLimit() {
506                synchronized (this.lifecycleMonitor) {
507                        return this.idleTaskExecutionLimit;
508                }
509        }
510
511
512        //-------------------------------------------------------------------------
513        // Implementation of AbstractMessageListenerContainer's template methods
514        //-------------------------------------------------------------------------
515
516        @Override
517        public void initialize() {
518                // Adapt default cache level.
519                if (this.cacheLevel == CACHE_AUTO) {
520                        this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
521                }
522
523                // Prepare taskExecutor and maxMessagesPerTask.
524                synchronized (this.lifecycleMonitor) {
525                        if (this.taskExecutor == null) {
526                                this.taskExecutor = createDefaultTaskExecutor();
527                        }
528                        else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
529                                        ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
530                                        this.maxMessagesPerTask == Integer.MIN_VALUE) {
531                                // TaskExecutor indicated a preference for short-lived tasks. According to
532                                // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
533                                // unless the user specified a custom value.
534                                this.maxMessagesPerTask = 10;
535                        }
536                }
537
538                // Proceed with actual listener initialization.
539                super.initialize();
540        }
541
542        /**
543         * Creates the specified number of concurrent consumers,
544         * in the form of a JMS Session plus associated MessageConsumer
545         * running in a separate thread.
546         * @see #scheduleNewInvoker
547         * @see #setTaskExecutor
548         */
549        @Override
550        protected void doInitialize() throws JMSException {
551                synchronized (this.lifecycleMonitor) {
552                        for (int i = 0; i < this.concurrentConsumers; i++) {
553                                scheduleNewInvoker();
554                        }
555                }
556        }
557
558        /**
559         * Destroy the registered JMS Sessions and associated MessageConsumers.
560         */
561        @Override
562        protected void doShutdown() throws JMSException {
563                logger.debug("Waiting for shutdown of message listener invokers");
564                try {
565                        synchronized (this.lifecycleMonitor) {
566                                long receiveTimeout = getReceiveTimeout();
567                                long waitStartTime = System.currentTimeMillis();
568                                int waitCount = 0;
569                                while (this.activeInvokerCount > 0) {
570                                        if (waitCount > 0 && !isAcceptMessagesWhileStopping() &&
571                                                        System.currentTimeMillis() - waitStartTime >= receiveTimeout) {
572                                                // Unexpectedly some invokers are still active after the receive timeout period
573                                                // -> interrupt remaining receive attempts since we'd reject the messages anyway
574                                                for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
575                                                        scheduledInvoker.interruptIfNecessary();
576                                                }
577                                        }
578                                        if (logger.isDebugEnabled()) {
579                                                logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
580                                                                " message listener invokers (iteration " + waitCount + ")");
581                                        }
582                                        // Wait for AsyncMessageListenerInvokers to deactivate themselves...
583                                        if (receiveTimeout > 0) {
584                                                this.lifecycleMonitor.wait(receiveTimeout);
585                                        }
586                                        else {
587                                                this.lifecycleMonitor.wait();
588                                        }
589                                        waitCount++;
590                                }
591                                // Clear remaining scheduled invokers, possibly left over as paused tasks
592                                for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
593                                        scheduledInvoker.clearResources();
594                                }
595                                this.scheduledInvokers.clear();
596                        }
597                }
598                catch (InterruptedException ex) {
599                        // Re-interrupt current thread, to allow other threads to react.
600                        Thread.currentThread().interrupt();
601                }
602        }
603
604        /**
605         * Overridden to reset the stop callback, if any.
606         */
607        @Override
608        public void start() throws JmsException {
609                synchronized (this.lifecycleMonitor) {
610                        this.stopCallback = null;
611                }
612                super.start();
613        }
614
615        /**
616         * Stop this listener container, invoking the specific callback
617         * once all listener processing has actually stopped.
618         * <p>Note: Further {@code stop(runnable)} calls (before processing
619         * has actually stopped) will override the specified callback. Only the
620         * latest specified callback will be invoked.
621         * <p>If a subsequent {@link #start()} call restarts the listener container
622         * before it has fully stopped, the callback will not get invoked at all.
623         * @param callback the callback to invoke once listener processing
624         * has fully stopped
625         * @throws JmsException if stopping failed
626         * @see #stop()
627         */
628        @Override
629        public void stop(Runnable callback) throws JmsException {
630                synchronized (this.lifecycleMonitor) {
631                        if (!isRunning() || this.stopCallback != null) {
632                                // Not started, already stopped, or previous stop attempt in progress
633                                // -> return immediately, no stop process to control anymore.
634                                callback.run();
635                                return;
636                        }
637                        this.stopCallback = callback;
638                }
639                stop();
640        }
641
642        /**
643         * Return the number of currently scheduled consumers.
644         * <p>This number will always be between "concurrentConsumers" and
645         * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
646         * (in case some consumers are scheduled but not executing at the moment).
647         * @see #getConcurrentConsumers()
648         * @see #getMaxConcurrentConsumers()
649         * @see #getActiveConsumerCount()
650         */
651        public final int getScheduledConsumerCount() {
652                synchronized (this.lifecycleMonitor) {
653                        return this.scheduledInvokers.size();
654                }
655        }
656
657        /**
658         * Return the number of currently active consumers.
659         * <p>This number will always be between "concurrentConsumers" and
660         * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount"
661         * (in case some consumers are scheduled but not executing at the moment).
662         * @see #getConcurrentConsumers()
663         * @see #getMaxConcurrentConsumers()
664         * @see #getActiveConsumerCount()
665         */
666        public final int getActiveConsumerCount() {
667                synchronized (this.lifecycleMonitor) {
668                        return this.activeInvokerCount;
669                }
670        }
671
672        /**
673         * Return whether at least one consumer has entered a fixed registration with the
674         * target destination. This is particularly interesting for the pub-sub case where
675         * it might be important to have an actual consumer registered that is guaranteed
676         * not to miss any messages that are just about to be published.
677         * <p>This method may be polled after a {@link #start()} call, until asynchronous
678         * registration of consumers has happened which is when the method will start returning
679         * {@code true} &ndash; provided that the listener container ever actually establishes
680         * a fixed registration. It will then keep returning {@code true} until shutdown,
681         * since the container will hold on to at least one consumer registration thereafter.
682         * <p>Note that a listener container is not bound to having a fixed registration in
683         * the first place. It may also keep recreating consumers for every invoker execution.
684         * This particularly depends on the {@link #setCacheLevel cache level} setting:
685         * only {@link #CACHE_CONSUMER} will lead to a fixed registration.
686         */
687        public boolean isRegisteredWithDestination() {
688                synchronized (this.lifecycleMonitor) {
689                        return (this.registeredWithDestination > 0);
690                }
691        }
692
693
694        /**
695         * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
696         * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
697         * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
698         * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
699         */
700        protected TaskExecutor createDefaultTaskExecutor() {
701                String beanName = getBeanName();
702                String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
703                return new SimpleAsyncTaskExecutor(threadNamePrefix);
704        }
705
706        /**
707         * Schedule a new invoker, increasing the total number of scheduled
708         * invokers for this listener container.
709         */
710        private void scheduleNewInvoker() {
711                AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
712                if (rescheduleTaskIfNecessary(invoker)) {
713                        // This should always be true, since we're only calling this when active.
714                        this.scheduledInvokers.add(invoker);
715                }
716        }
717
718        /**
719         * Use a shared JMS Connection depending on the "cacheLevel" setting.
720         * @see #setCacheLevel
721         * @see #CACHE_CONNECTION
722         */
723        @Override
724        protected final boolean sharedConnectionEnabled() {
725                return (getCacheLevel() >= CACHE_CONNECTION);
726        }
727
728        /**
729         * Re-executes the given task via this listener container's TaskExecutor.
730         * @see #setTaskExecutor
731         */
732        @Override
733        protected void doRescheduleTask(Object task) {
734                Assert.state(this.taskExecutor != null, "No TaskExecutor available");
735                this.taskExecutor.execute((Runnable) task);
736        }
737
738        /**
739         * Tries scheduling a new invoker, since we know messages are coming in...
740         * @see #scheduleNewInvokerIfAppropriate()
741         */
742        @Override
743        protected void messageReceived(Object invoker, Session session) {
744                ((AsyncMessageListenerInvoker) invoker).setIdle(false);
745                scheduleNewInvokerIfAppropriate();
746        }
747
748        /**
749         * Marks the affected invoker as idle.
750         */
751        @Override
752        protected void noMessageReceived(Object invoker, Session session) {
753                ((AsyncMessageListenerInvoker) invoker).setIdle(true);
754        }
755
756        /**
757         * Schedule a new invoker, increasing the total number of scheduled
758         * invokers for this listener container, but only if the specified
759         * "maxConcurrentConsumers" limit has not been reached yet, and only
760         * if the specified "idleConsumerLimit" has not been reached either.
761         * <p>Called once a message has been received, in order to scale up while
762         * processing the message in the invoker that originally received it.
763         * @see #setTaskExecutor
764         * @see #getMaxConcurrentConsumers()
765         * @see #getIdleConsumerLimit()
766         */
767        protected void scheduleNewInvokerIfAppropriate() {
768                if (isRunning()) {
769                        resumePausedTasks();
770                        synchronized (this.lifecycleMonitor) {
771                                if (this.scheduledInvokers.size() < this.maxConcurrentConsumers &&
772                                                getIdleInvokerCount() < this.idleConsumerLimit) {
773                                        scheduleNewInvoker();
774                                        if (logger.isDebugEnabled()) {
775                                                logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());
776                                        }
777                                }
778                        }
779                }
780        }
781
782        /**
783         * Determine whether the current invoker should be rescheduled,
784         * given that it might not have received a message in a while.
785         * @param idleTaskExecutionCount the number of idle executions
786         * that this invoker task has already accumulated (in a row)
787         */
788        private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
789                boolean superfluous =
790                                (idleTaskExecutionCount >= this.idleTaskExecutionLimit && getIdleInvokerCount() > 1);
791                return (this.scheduledInvokers.size() <=
792                                (superfluous ? this.concurrentConsumers : this.maxConcurrentConsumers));
793        }
794
795        /**
796         * Determine whether this listener container currently has more
797         * than one idle instance among its scheduled invokers.
798         */
799        private int getIdleInvokerCount() {
800                int count = 0;
801                for (AsyncMessageListenerInvoker invoker : this.scheduledInvokers) {
802                        if (invoker.isIdle()) {
803                                count++;
804                        }
805                }
806                return count;
807        }
808
809
810        /**
811         * Overridden to accept a failure in the initial setup - leaving it up to the
812         * asynchronous invokers to establish the shared Connection on first access.
813         * @see #refreshConnectionUntilSuccessful()
814         */
815        @Override
816        protected void establishSharedConnection() {
817                try {
818                        super.establishSharedConnection();
819                }
820                catch (Exception ex) {
821                        if (ex instanceof JMSException) {
822                                invokeExceptionListener((JMSException) ex);
823                        }
824                        logger.debug("Could not establish shared JMS Connection - " +
825                                        "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
826                }
827        }
828
829        /**
830         * This implementations proceeds even after an exception thrown from
831         * {@code Connection.start()}, relying on listeners to perform
832         * appropriate recovery.
833         */
834        @Override
835        protected void startSharedConnection() {
836                try {
837                        super.startSharedConnection();
838                }
839                catch (Exception ex) {
840                        logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
841                }
842        }
843
844        /**
845         * This implementations proceeds even after an exception thrown from
846         * {@code Connection.stop()}, relying on listeners to perform
847         * appropriate recovery after a restart.
848         */
849        @Override
850        protected void stopSharedConnection() {
851                try {
852                        super.stopSharedConnection();
853                }
854                catch (Exception ex) {
855                        logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
856                }
857        }
858
859        /**
860         * Handle the given exception that arose during setup of a listener.
861         * Called for every such exception in every concurrent listener.
862         * <p>The default implementation logs the exception at warn level
863         * if not recovered yet, and at debug level if already recovered.
864         * Can be overridden in subclasses.
865         * @param ex the exception to handle
866         * @param alreadyRecovered whether a previously executing listener
867         * already recovered from the present listener setup failure
868         * (this usually indicates a follow-up failure than can be ignored
869         * other than for debug log purposes)
870         * @see #recoverAfterListenerSetupFailure()
871         */
872        protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
873                if (ex instanceof JMSException) {
874                        invokeExceptionListener((JMSException) ex);
875                }
876                if (ex instanceof SharedConnectionNotInitializedException) {
877                        if (!alreadyRecovered) {
878                                logger.debug("JMS message listener invoker needs to establish shared Connection");
879                        }
880                }
881                else {
882                        // Recovery during active operation..
883                        if (alreadyRecovered) {
884                                logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
885                        }
886                        else {
887                                StringBuilder msg = new StringBuilder();
888                                msg.append("Setup of JMS message listener invoker failed for destination '");
889                                msg.append(getDestinationDescription()).append("' - trying to recover. Cause: ");
890                                msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
891                                if (logger.isDebugEnabled()) {
892                                        logger.warn(msg, ex);
893                                }
894                                else {
895                                        logger.warn(msg);
896                                }
897                        }
898                }
899        }
900
901        /**
902         * Recover this listener container after a listener failed to set itself up,
903         * for example re-establishing the underlying Connection.
904         * <p>The default implementation delegates to DefaultMessageListenerContainer's
905         * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
906         * try to re-establish a Connection to the JMS provider both for the shared
907         * and the non-shared Connection case.
908         * @see #refreshConnectionUntilSuccessful()
909         * @see #refreshDestination()
910         */
911        protected void recoverAfterListenerSetupFailure() {
912                this.recovering = true;
913                try {
914                        refreshConnectionUntilSuccessful();
915                        refreshDestination();
916                }
917                finally {
918                        this.recovering = false;
919                        this.interrupted = false;
920                }
921        }
922
923        /**
924         * Refresh the underlying Connection, not returning before an attempt has been
925         * successful. Called in case of a shared Connection as well as without shared
926         * Connection, so either needs to operate on the shared Connection or on a
927         * temporary Connection that just gets established for validation purposes.
928         * <p>The default implementation retries until it successfully established a
929         * Connection, for as long as this message listener container is running.
930         * Applies the specified recovery interval between retries.
931         * @see #setRecoveryInterval
932         * @see #start()
933         * @see #stop()
934         */
935        protected void refreshConnectionUntilSuccessful() {
936                BackOffExecution execution = this.backOff.start();
937                while (isRunning()) {
938                        try {
939                                if (sharedConnectionEnabled()) {
940                                        refreshSharedConnection();
941                                }
942                                else {
943                                        Connection con = createConnection();
944                                        JmsUtils.closeConnection(con);
945                                }
946                                logger.debug("Successfully refreshed JMS Connection");
947                                break;
948                        }
949                        catch (Exception ex) {
950                                if (ex instanceof JMSException) {
951                                        invokeExceptionListener((JMSException) ex);
952                                }
953                                StringBuilder msg = new StringBuilder();
954                                msg.append("Could not refresh JMS Connection for destination '");
955                                msg.append(getDestinationDescription()).append("' - retrying using ");
956                                msg.append(execution).append(". Cause: ");
957                                msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
958                                if (logger.isDebugEnabled()) {
959                                        logger.error(msg, ex);
960                                }
961                                else {
962                                        logger.error(msg);
963                                }
964                        }
965                        if (!applyBackOffTime(execution)) {
966                                StringBuilder msg = new StringBuilder();
967                                msg.append("Stopping container for destination '")
968                                                .append(getDestinationDescription())
969                                                .append("': back-off policy does not allow ").append("for further attempts.");
970                                logger.error(msg.toString());
971                                stop();
972                        }
973                }
974        }
975
976        /**
977         * Refresh the JMS destination that this listener container operates on.
978         * <p>Called after listener setup failure, assuming that a cached Destination
979         * object might have become invalid (a typical case on WebLogic JMS).
980         * <p>The default implementation removes the destination from a
981         * DestinationResolver's cache, in case of a CachingDestinationResolver.
982         * @see #setDestinationName
983         * @see org.springframework.jms.support.destination.CachingDestinationResolver
984         */
985        protected void refreshDestination() {
986                String destName = getDestinationName();
987                if (destName != null) {
988                        DestinationResolver destResolver = getDestinationResolver();
989                        if (destResolver instanceof CachingDestinationResolver) {
990                                ((CachingDestinationResolver) destResolver).removeFromCache(destName);
991                        }
992                }
993        }
994
995        /**
996         * Apply the next back-off time using the specified {@link BackOffExecution}.
997         * <p>Return {@code true} if the back-off period has been applied and a new
998         * attempt to recover should be made, {@code false} if no further attempt
999         * should be made.
1000         * @since 4.1
1001         */
1002        protected boolean applyBackOffTime(BackOffExecution execution) {
1003                if (this.recovering && this.interrupted) {
1004                        // Interrupted right before and still failing... give up.
1005                        return false;
1006                }
1007                long interval = execution.nextBackOff();
1008                if (interval == BackOffExecution.STOP) {
1009                        return false;
1010                }
1011                else {
1012                        try {
1013                                synchronized (this.lifecycleMonitor) {
1014                                        this.lifecycleMonitor.wait(interval);
1015                                }
1016                        }
1017                        catch (InterruptedException interEx) {
1018                                // Re-interrupt current thread, to allow other threads to react.
1019                                Thread.currentThread().interrupt();
1020                                if (this.recovering) {
1021                                        this.interrupted = true;
1022                                }
1023                        }
1024                        return true;
1025                }
1026        }
1027
1028        /**
1029         * Return whether this listener container is currently in a recovery attempt.
1030         * <p>May be used to detect recovery phases but also the end of a recovery phase,
1031         * with {@code isRecovering()} switching to {@code false} after having been found
1032         * to return {@code true} before.
1033         * @see #recoverAfterListenerSetupFailure()
1034         */
1035        public final boolean isRecovering() {
1036                return this.recovering;
1037        }
1038
1039
1040        //-------------------------------------------------------------------------
1041        // Inner classes used as internal adapters
1042        //-------------------------------------------------------------------------
1043
1044        /**
1045         * Runnable that performs looped {@code MessageConsumer.receive()} calls.
1046         */
1047        private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
1048
1049                @Nullable
1050                private Session session;
1051
1052                @Nullable
1053                private MessageConsumer consumer;
1054
1055                @Nullable
1056                private Object lastRecoveryMarker;
1057
1058                private boolean lastMessageSucceeded;
1059
1060                private int idleTaskExecutionCount = 0;
1061
1062                private volatile boolean idle = true;
1063
1064                @Nullable
1065                private volatile Thread currentReceiveThread;
1066
1067                @Override
1068                public void run() {
1069                        synchronized (lifecycleMonitor) {
1070                                activeInvokerCount++;
1071                                lifecycleMonitor.notifyAll();
1072                        }
1073                        boolean messageReceived = false;
1074                        try {
1075                                if (maxMessagesPerTask < 0) {
1076                                        messageReceived = executeOngoingLoop();
1077                                }
1078                                else {
1079                                        int messageCount = 0;
1080                                        while (isRunning() && messageCount < maxMessagesPerTask) {
1081                                                messageReceived = (invokeListener() || messageReceived);
1082                                                messageCount++;
1083                                        }
1084                                }
1085                        }
1086                        catch (Throwable ex) {
1087                                clearResources();
1088                                if (!this.lastMessageSucceeded) {
1089                                        // We failed more than once in a row or on startup -
1090                                        // wait before first recovery attempt.
1091                                        waitBeforeRecoveryAttempt();
1092                                }
1093                                this.lastMessageSucceeded = false;
1094                                boolean alreadyRecovered = false;
1095                                synchronized (recoveryMonitor) {
1096                                        if (this.lastRecoveryMarker == currentRecoveryMarker) {
1097                                                handleListenerSetupFailure(ex, false);
1098                                                recoverAfterListenerSetupFailure();
1099                                                currentRecoveryMarker = new Object();
1100                                        }
1101                                        else {
1102                                                alreadyRecovered = true;
1103                                        }
1104                                }
1105                                if (alreadyRecovered) {
1106                                        handleListenerSetupFailure(ex, true);
1107                                }
1108                        }
1109                        finally {
1110                                synchronized (lifecycleMonitor) {
1111                                        decreaseActiveInvokerCount();
1112                                        lifecycleMonitor.notifyAll();
1113                                }
1114                                if (!messageReceived) {
1115                                        this.idleTaskExecutionCount++;
1116                                }
1117                                else {
1118                                        this.idleTaskExecutionCount = 0;
1119                                }
1120                                synchronized (lifecycleMonitor) {
1121                                        if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
1122                                                // We're shutting down completely.
1123                                                scheduledInvokers.remove(this);
1124                                                if (logger.isDebugEnabled()) {
1125                                                        logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
1126                                                }
1127                                                lifecycleMonitor.notifyAll();
1128                                                clearResources();
1129                                        }
1130                                        else if (isRunning()) {
1131                                                int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
1132                                                if (nonPausedConsumers < 1) {
1133                                                        logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
1134                                                                        "Check your thread pool configuration! Manual recovery necessary through a start() call.");
1135                                                }
1136                                                else if (nonPausedConsumers < getConcurrentConsumers()) {
1137                                                        logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
1138                                                                        "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
1139                                                                        "to be triggered by remaining consumers.");
1140                                                }
1141                                        }
1142                                }
1143                        }
1144                }
1145
1146                private boolean executeOngoingLoop() throws JMSException {
1147                        boolean messageReceived = false;
1148                        boolean active = true;
1149                        while (active) {
1150                                synchronized (lifecycleMonitor) {
1151                                        boolean interrupted = false;
1152                                        boolean wasWaiting = false;
1153                                        while ((active = isActive()) && !isRunning()) {
1154                                                if (interrupted) {
1155                                                        throw new IllegalStateException("Thread was interrupted while waiting for " +
1156                                                                        "a restart of the listener container, but container is still stopped");
1157                                                }
1158                                                if (!wasWaiting) {
1159                                                        decreaseActiveInvokerCount();
1160                                                }
1161                                                wasWaiting = true;
1162                                                try {
1163                                                        lifecycleMonitor.wait();
1164                                                }
1165                                                catch (InterruptedException ex) {
1166                                                        // Re-interrupt current thread, to allow other threads to react.
1167                                                        Thread.currentThread().interrupt();
1168                                                        interrupted = true;
1169                                                }
1170                                        }
1171                                        if (wasWaiting) {
1172                                                activeInvokerCount++;
1173                                        }
1174                                        if (scheduledInvokers.size() > maxConcurrentConsumers) {
1175                                                active = false;
1176                                        }
1177                                }
1178                                if (active) {
1179                                        messageReceived = (invokeListener() || messageReceived);
1180                                }
1181                        }
1182                        return messageReceived;
1183                }
1184
1185                private boolean invokeListener() throws JMSException {
1186                        this.currentReceiveThread = Thread.currentThread();
1187                        try {
1188                                initResourcesIfNecessary();
1189                                boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
1190                                this.lastMessageSucceeded = true;
1191                                return messageReceived;
1192                        }
1193                        finally {
1194                                this.currentReceiveThread = null;
1195                        }
1196                }
1197
1198                private void decreaseActiveInvokerCount() {
1199                        activeInvokerCount--;
1200                        if (stopCallback != null && activeInvokerCount == 0) {
1201                                stopCallback.run();
1202                                stopCallback = null;
1203                        }
1204                }
1205
1206                private void initResourcesIfNecessary() throws JMSException {
1207                        if (getCacheLevel() <= CACHE_CONNECTION) {
1208                                updateRecoveryMarker();
1209                        }
1210                        else {
1211                                if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
1212                                        updateRecoveryMarker();
1213                                        this.session = createSession(getSharedConnection());
1214                                }
1215                                if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
1216                                        this.consumer = createListenerConsumer(this.session);
1217                                        synchronized (lifecycleMonitor) {
1218                                                registeredWithDestination++;
1219                                        }
1220                                }
1221                        }
1222                }
1223
1224                private void updateRecoveryMarker() {
1225                        synchronized (recoveryMonitor) {
1226                                this.lastRecoveryMarker = currentRecoveryMarker;
1227                        }
1228                }
1229
1230                private void interruptIfNecessary() {
1231                        Thread currentReceiveThread = this.currentReceiveThread;
1232                        if (currentReceiveThread != null && !currentReceiveThread.isInterrupted()) {
1233                                currentReceiveThread.interrupt();
1234                        }
1235                }
1236
1237                private void clearResources() {
1238                        if (sharedConnectionEnabled()) {
1239                                synchronized (sharedConnectionMonitor) {
1240                                        JmsUtils.closeMessageConsumer(this.consumer);
1241                                        JmsUtils.closeSession(this.session);
1242                                }
1243                        }
1244                        else {
1245                                JmsUtils.closeMessageConsumer(this.consumer);
1246                                JmsUtils.closeSession(this.session);
1247                        }
1248                        if (this.consumer != null) {
1249                                synchronized (lifecycleMonitor) {
1250                                        registeredWithDestination--;
1251                                }
1252                        }
1253                        this.consumer = null;
1254                        this.session = null;
1255                }
1256
1257                /**
1258                 * Apply the back-off time once. In a regular scenario, the back-off is only applied if we
1259                 * failed to recover with the broker. This additional wait period avoids a burst retry
1260                 * scenario when the broker is actually up but something else if failing (i.e. listener
1261                 * specific).
1262                 */
1263                private void waitBeforeRecoveryAttempt() {
1264                        BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
1265                        applyBackOffTime(execution);
1266                }
1267
1268                @Override
1269                public boolean isLongLived() {
1270                        return (maxMessagesPerTask < 0);
1271                }
1272
1273                public void setIdle(boolean idle) {
1274                        this.idle = idle;
1275                }
1276
1277                public boolean isIdle() {
1278                        return this.idle;
1279                }
1280        }
1281
1282}