001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.util.HashSet;
020import java.util.Set;
021import java.util.concurrent.Executor;
022import javax.jms.Connection;
023import javax.jms.JMSException;
024import javax.jms.MessageConsumer;
025import javax.jms.Session;
026
027import org.springframework.core.Constants;
028import org.springframework.core.task.SimpleAsyncTaskExecutor;
029import org.springframework.core.task.TaskExecutor;
030import org.springframework.jms.JmsException;
031import org.springframework.jms.support.JmsUtils;
032import org.springframework.jms.support.destination.CachingDestinationResolver;
033import org.springframework.jms.support.destination.DestinationResolver;
034import org.springframework.scheduling.SchedulingAwareRunnable;
035import org.springframework.scheduling.SchedulingTaskExecutor;
036import org.springframework.util.Assert;
037import org.springframework.util.ClassUtils;
038import org.springframework.util.backoff.BackOff;
039import org.springframework.util.backoff.BackOffExecution;
040import org.springframework.util.backoff.FixedBackOff;
041
042/**
043 * Message listener container variant that uses plain JMS client APIs, specifically
044 * a loop of {@code MessageConsumer.receive()} calls that also allow for
045 * transactional reception of messages (registering them with XA transactions).
046 * Designed to work in a native JMS environment as well as in a Java EE environment,
047 * with only minimal differences in configuration.
048 *
049 * <p>This is a simple but nevertheless powerful form of message listener container.
050 * On startup, it obtains a fixed number of JMS Sessions to invoke the listener,
051 * and optionally allows for dynamic adaptation at runtime (up to a maximum number).
052 * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level
053 * of runtime complexity, in particular the minimal requirements on the JMS provider:
054 * not even the JMS {@code ServerSessionPool} facility is required. Beyond that, it is
055 * fully self-recovering in case the broker is temporarily unavailable, and allows
056 * for stops/restarts as well as runtime changes to its configuration.
057 *
058 * <p>Actual {@code MessageListener} execution happens in asynchronous work units which are
059 * created through Spring's {@link org.springframework.core.task.TaskExecutor TaskExecutor}
060 * abstraction. By default, the specified number of invoker tasks will be created
061 * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"}
062 * setting. Specify an alternative {@code TaskExecutor} to integrate with an existing
063 * thread pool facility (such as a Java EE server's), for example using a
064 * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}.
065 * With a native JMS setup, each of those listener threads is going to use a
066 * cached JMS {@code Session} and {@code MessageConsumer} (only refreshed in case
067 * of failure), using the JMS provider's resources as efficiently as possible.
068 *
069 * <p>Message reception and listener execution can automatically be wrapped
070 * in transactions by passing a Spring
071 * {@link org.springframework.transaction.PlatformTransactionManager} into the
072 * {@link #setTransactionManager "transactionManager"} property. This will usually
073 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
074 * Java EE environment, in combination with a JTA-aware JMS {@code ConnectionFactory}
075 * obtained from JNDI (check your Java EE server's documentation). Note that this
076 * listener container will automatically reobtain all JMS handles for each transaction
077 * in case an external transaction manager is specified, for compatibility with
078 * all Java EE servers (in particular JBoss). This non-caching behavior can be
079 * overridden through the {@link #setCacheLevel "cacheLevel"} /
080 * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching of
081 * the {@code Connection} (or also {@code Session} and {@code MessageConsumer})
082 * even if an external transaction manager is involved.
083 *
084 * <p>Dynamic scaling of the number of concurrent invokers can be activated
085 * by specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"}
086 * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"}
087 * value. Since the latter's default is 1, you can also simply specify a
088 * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to
089 * 5 concurrent consumers in case of increasing message load, as well as dynamic
090 * shrinking back to the standard number of consumers once the load decreases.
091 * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"}
092 * setting to control the lifespan of each new task, to avoid frequent scaling up
093 * and down, in particular if the {@code ConnectionFactory} does not pool JMS
094 * {@code Sessions} and/or the {@code TaskExecutor} does not pool threads (check
095 * your configuration!). Note that dynamic scaling only really makes sense for a
096 * queue in the first place; for a topic, you will typically stick with the default
097 * number of 1 consumer, otherwise you'd receive the same message multiple times on
098 * the same node.
099 *
100 * <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory}
101 * in combination with dynamic scaling.</b> Ideally, don't use it with a message
102 * listener container at all, since it is generally preferable to let the
103 * listener container itself handle appropriate caching within its lifecycle.
104 * Also, stopping and restarting a listener container will only work with an
105 * independent, locally cached Connection - not with an externally cached one.
106 *
107 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted
108 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager
109 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer}
110 * javadoc for details on acknowledge modes and native transaction options, as
111 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details
112 * on configuring an external transaction manager. Note that for the default
113 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment
114 * before listener execution, with no redelivery in case of an exception.
115 *
116 * @author Juergen Hoeller
117 * @since 2.0
118 * @see #setTransactionManager
119 * @see #setCacheLevel
120 * @see javax.jms.MessageConsumer#receive(long)
121 * @see SimpleMessageListenerContainer
122 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
123 */
124public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
125
126        /**
127         * Default thread name prefix: "DefaultMessageListenerContainer-".
128         */
129        public static final String DEFAULT_THREAD_NAME_PREFIX =
130                        ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-";
131
132        /**
133         * The default recovery interval: 5000 ms = 5 seconds.
134         */
135        public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
136
137
138        /**
139         * Constant that indicates to cache no JMS resources at all.
140         * @see #setCacheLevel
141         */
142        public static final int CACHE_NONE = 0;
143
144        /**
145         * Constant that indicates to cache a shared JMS {@code Connection} for each
146         * listener thread.
147         * @see #setCacheLevel
148         */
149        public static final int CACHE_CONNECTION = 1;
150
151        /**
152         * Constant that indicates to cache a shared JMS {@code Connection} and a JMS
153         * {@code Session} for each listener thread.
154         * @see #setCacheLevel
155         */
156        public static final int CACHE_SESSION = 2;
157
158        /**
159         * Constant that indicates to cache a shared JMS {@code Connection}, a JMS
160         * {@code Session}, and a JMS MessageConsumer for each listener thread.
161         * @see #setCacheLevel
162         */
163        public static final int CACHE_CONSUMER = 3;
164
165        /**
166         * Constant that indicates automatic choice of an appropriate caching level
167         * (depending on the transaction management strategy).
168         * @see #setCacheLevel
169         */
170        public static final int CACHE_AUTO = 4;
171
172
173        private static final Constants constants = new Constants(DefaultMessageListenerContainer.class);
174
175
176        private Executor taskExecutor;
177
178        private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);
179
180        private int cacheLevel = CACHE_AUTO;
181
182        private int concurrentConsumers = 1;
183
184        private int maxConcurrentConsumers = 1;
185
186        private int maxMessagesPerTask = Integer.MIN_VALUE;
187
188        private int idleConsumerLimit = 1;
189
190        private int idleTaskExecutionLimit = 1;
191
192        private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>();
193
194        private int activeInvokerCount = 0;
195
196        private int registeredWithDestination = 0;
197
198        private volatile boolean recovering = false;
199
200        private volatile boolean interrupted = false;
201
202        private Runnable stopCallback;
203
204        private Object currentRecoveryMarker = new Object();
205
206        private final Object recoveryMonitor = new Object();
207
208
209        /**
210         * Set the Spring {@code TaskExecutor} to use for running the listener threads.
211         * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor},
212         * starting up a number of new threads, according to the specified number
213         * of concurrent consumers.
214         * <p>Specify an alternative {@code TaskExecutor} for integration with an existing
215         * thread pool. Note that this really only adds value if the threads are
216         * managed in a specific fashion, for example within a Java EE environment.
217         * A plain thread pool does not add much value, as this listener container
218         * will occupy a number of threads for its entire lifetime.
219         * @see #setConcurrentConsumers
220         * @see org.springframework.core.task.SimpleAsyncTaskExecutor
221         * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
222         */
223        public void setTaskExecutor(Executor taskExecutor) {
224                this.taskExecutor = taskExecutor;
225        }
226
227        /**
228         * Specify the {@link BackOff} instance to use to compute the interval
229         * between recovery attempts. If the {@link BackOffExecution} implementation
230         * returns {@link BackOffExecution#STOP}, this listener container will not further
231         * attempt to recover.
232         * <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored
233         * when this property is set.
234         * @since 4.1
235         */
236        public void setBackOff(BackOff backOff) {
237                this.backOff = backOff;
238        }
239
240        /**
241         * Specify the interval between recovery attempts, in <b>milliseconds</b>.
242         * The default is 5000 ms, that is, 5 seconds. This is a convenience method
243         * to create a {@link FixedBackOff} with the specified interval.
244         * <p>For more recovery options, consider specifying a {@link BackOff}
245         * instance instead.
246         * @see #setBackOff(BackOff)
247         * @see #handleListenerSetupFailure
248         */
249        public void setRecoveryInterval(long recoveryInterval) {
250                this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE);
251        }
252
253        /**
254         * Specify the level of caching that this listener container is allowed to apply,
255         * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION".
256         * @see #setCacheLevel
257         */
258        public void setCacheLevelName(String constantName) throws IllegalArgumentException {
259                if (constantName == null || !constantName.startsWith("CACHE_")) {
260                        throw new IllegalArgumentException("Only cache constants allowed");
261                }
262                setCacheLevel(constants.asNumber(constantName).intValue());
263        }
264
265        /**
266         * Specify the level of caching that this listener container is allowed to apply.
267         * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified
268         * (to reobtain all resources freshly within the scope of the external transaction),
269         * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources).
270         * <p>Some Java EE servers only register their JMS resources with an ongoing XA
271         * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session},
272         * which is why this listener container by default does not cache any of those.
273         * However, depending on the rules of your server with respect to the caching
274         * of transactional resources, consider switching this setting to at least
275         * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an
276         * external transaction manager.
277         * @see #CACHE_NONE
278         * @see #CACHE_CONNECTION
279         * @see #CACHE_SESSION
280         * @see #CACHE_CONSUMER
281         * @see #setCacheLevelName
282         * @see #setTransactionManager
283         */
284        public void setCacheLevel(int cacheLevel) {
285                this.cacheLevel = cacheLevel;
286        }
287
288        /**
289         * Return the level of caching that this listener container is allowed to apply.
290         */
291        public int getCacheLevel() {
292                return this.cacheLevel;
293        }
294
295
296        /**
297         * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
298         * upper limit String, e.g. "10" (the lower limit will be 1 in this case).
299         * <p>This listener container will always hold on to the minimum number of consumers
300         * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
301         * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
302         */
303        @Override
304        public void setConcurrency(String concurrency) {
305                try {
306                        int separatorIndex = concurrency.indexOf('-');
307                        if (separatorIndex != -1) {
308                                setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
309                                setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
310                        }
311                        else {
312                                setConcurrentConsumers(1);
313                                setMaxConcurrentConsumers(Integer.parseInt(concurrency));
314                        }
315                }
316                catch (NumberFormatException ex) {
317                        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
318                                        "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
319                }
320        }
321
322        /**
323         * Specify the number of concurrent consumers to create. Default is 1.
324         * <p>Specifying a higher value for this setting will increase the standard
325         * level of scheduled concurrent consumers at runtime: This is effectively
326         * the minimum number of concurrent consumers which will be scheduled
327         * at any given time. This is a static setting; for dynamic scaling,
328         * consider specifying the "maxConcurrentConsumers" setting instead.
329         * <p>Raising the number of concurrent consumers is recommendable in order
330         * to scale the consumption of messages coming in from a queue. However,
331         * note that any ordering guarantees are lost once multiple consumers are
332         * registered. In general, stick with 1 consumer for low-volume queues.
333         * <p><b>Do not raise the number of concurrent consumers for a topic,
334         * unless vendor-specific setup measures clearly allow for it.</b>
335         * With regular setup, this would lead to concurrent consumption
336         * of the same message, which is hardly ever desirable.
337         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
338         * @see #setMaxConcurrentConsumers
339         */
340        public void setConcurrentConsumers(int concurrentConsumers) {
341                Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
342                synchronized (this.lifecycleMonitor) {
343                        this.concurrentConsumers = concurrentConsumers;
344                        if (this.maxConcurrentConsumers < concurrentConsumers) {
345                                this.maxConcurrentConsumers = concurrentConsumers;
346                        }
347                }
348        }
349
350        /**
351         * Return the "concurrentConsumer" setting.
352         * <p>This returns the currently configured "concurrentConsumers" value;
353         * the number of currently scheduled/active consumers might differ.
354         * @see #getScheduledConsumerCount()
355         * @see #getActiveConsumerCount()
356         */
357        public final int getConcurrentConsumers() {
358                synchronized (this.lifecycleMonitor) {
359                        return this.concurrentConsumers;
360                }
361        }
362
363        /**
364         * Specify the maximum number of concurrent consumers to create. Default is 1.
365         * <p>If this setting is higher than "concurrentConsumers", the listener container
366         * will dynamically schedule new consumers at runtime, provided that enough
367         * incoming messages are encountered. Once the load goes down again, the number of
368         * consumers will be reduced to the standard level ("concurrentConsumers") again.
369         * <p>Raising the number of concurrent consumers is recommendable in order
370         * to scale the consumption of messages coming in from a queue. However,
371         * note that any ordering guarantees are lost once multiple consumers are
372         * registered. In general, stick with 1 consumer for low-volume queues.
373         * <p><b>Do not raise the number of concurrent consumers for a topic,
374         * unless vendor-specific setup measures clearly allow for it.</b>
375         * With regular setup, this would lead to concurrent consumption
376         * of the same message, which is hardly ever desirable.
377         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
378         * @see #setConcurrentConsumers
379         */
380        public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
381                Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)");
382                synchronized (this.lifecycleMonitor) {
383                        this.maxConcurrentConsumers = Math.max(maxConcurrentConsumers, this.concurrentConsumers);
384                }
385        }
386
387        /**
388         * Return the "maxConcurrentConsumer" setting.
389         * <p>This returns the currently configured "maxConcurrentConsumers" value;
390         * the number of currently scheduled/active consumers might differ.
391         * @see #getScheduledConsumerCount()
392         * @see #getActiveConsumerCount()
393         */
394        public final int getMaxConcurrentConsumers() {
395                synchronized (this.lifecycleMonitor) {
396                        return this.maxConcurrentConsumers;
397                }
398        }
399
400        /**
401         * Specify the maximum number of messages to process in one task.
402         * More concretely, this limits the number of message reception attempts
403         * per task, which includes receive iterations that did not actually
404         * pick up a message until they hit their timeout (see the
405         * {@link #setReceiveTimeout "receiveTimeout"} property).
406         * <p>Default is unlimited (-1) in case of a standard TaskExecutor,
407         * reusing the original invoker threads until shutdown (at the
408         * expense of limited dynamic scheduling).
409         * <p>In case of a SchedulingTaskExecutor indicating a preference for
410         * short-lived tasks, the default is 10 instead. Specify a number
411         * of 10 to 100 messages to balance between rather long-lived and
412         * rather short-lived tasks here.
413         * <p>Long-lived tasks avoid frequent thread context switches through
414         * sticking with the same thread all the way through, while short-lived
415         * tasks allow thread pools to control the scheduling. Hence, thread
416         * pools will usually prefer short-lived tasks.
417         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
418         * @see #setTaskExecutor
419         * @see #setReceiveTimeout
420         * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
421         */
422        public void setMaxMessagesPerTask(int maxMessagesPerTask) {
423                Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
424                synchronized (this.lifecycleMonitor) {
425                        this.maxMessagesPerTask = maxMessagesPerTask;
426                }
427        }
428
429        /**
430         * Return the maximum number of messages to process in one task.
431         */
432        public final int getMaxMessagesPerTask() {
433                synchronized (this.lifecycleMonitor) {
434                        return this.maxMessagesPerTask;
435                }
436        }
437
438        /**
439         * Specify the limit for the number of consumers that are allowed to be idle
440         * at any given time.
441         * <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method
442         * to determine if a new invoker should be created. Increasing the limit causes
443         * invokers to be created more aggressively. This can be useful to ramp up the
444         * number of invokers faster.
445         * <p>The default is 1, only scheduling a new invoker (which is likely to
446         * be idle initially) if none of the existing invokers is currently idle.
447         */
448        public void setIdleConsumerLimit(int idleConsumerLimit) {
449                Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher");
450                synchronized (this.lifecycleMonitor) {
451                        this.idleConsumerLimit = idleConsumerLimit;
452                }
453        }
454
455        /**
456         * Return the limit for the number of idle consumers.
457         */
458        public final int getIdleConsumerLimit() {
459                synchronized (this.lifecycleMonitor) {
460                        return this.idleConsumerLimit;
461                }
462        }
463
464        /**
465         * Specify the limit for idle executions of a consumer task, not having
466         * received any message within its execution. If this limit is reached,
467         * the task will shut down and leave receiving to other executing tasks.
468         * <p>The default is 1, closing idle resources early once a task didn't
469         * receive a message. This applies to dynamic scheduling only; see the
470         * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting.
471         * The minimum number of consumers
472         * (see {@link #setConcurrentConsumers "concurrentConsumers"})
473         * will be kept around until shutdown in any case.
474         * <p>Within each task execution, a number of message reception attempts
475         * (according to the "maxMessagesPerTask" setting) will each wait for an incoming
476         * message (according to the "receiveTimeout" setting). If all of those receive
477         * attempts in a given task return without a message, the task is considered
478         * idle with respect to received messages. Such a task may still be rescheduled;
479         * however, once it reached the specified "idleTaskExecutionLimit", it will
480         * shut down (in case of dynamic scaling).
481         * <p>Raise this limit if you encounter too frequent scaling up and down.
482         * With this limit being higher, an idle consumer will be kept around longer,
483         * avoiding the restart of a consumer once a new load of messages comes in.
484         * Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value,
485         * which will also lead to idle consumers being kept around for a longer time
486         * (while also increasing the average execution time of each scheduled task).
487         * <p><b>This setting can be modified at runtime, for example through JMX.</b>
488         * @see #setMaxMessagesPerTask
489         * @see #setReceiveTimeout
490         */
491        public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) {
492                Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher");
493                synchronized (this.lifecycleMonitor) {
494                        this.idleTaskExecutionLimit = idleTaskExecutionLimit;
495                }
496        }
497
498        /**
499         * Return the limit for idle executions of a consumer task.
500         */
501        public final int getIdleTaskExecutionLimit() {
502                synchronized (this.lifecycleMonitor) {
503                        return this.idleTaskExecutionLimit;
504                }
505        }
506
507
508        //-------------------------------------------------------------------------
509        // Implementation of AbstractMessageListenerContainer's template methods
510        //-------------------------------------------------------------------------
511
512        @Override
513        public void initialize() {
514                // Adapt default cache level.
515                if (this.cacheLevel == CACHE_AUTO) {
516                        this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
517                }
518
519                // Prepare taskExecutor and maxMessagesPerTask.
520                synchronized (this.lifecycleMonitor) {
521                        if (this.taskExecutor == null) {
522                                this.taskExecutor = createDefaultTaskExecutor();
523                        }
524                        else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
525                                        ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
526                                        this.maxMessagesPerTask == Integer.MIN_VALUE) {
527                                // TaskExecutor indicated a preference for short-lived tasks. According to
528                                // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
529                                // unless the user specified a custom value.
530                                this.maxMessagesPerTask = 10;
531                        }
532                }
533
534                // Proceed with actual listener initialization.
535                super.initialize();
536        }
537
538        /**
539         * Creates the specified number of concurrent consumers,
540         * in the form of a JMS Session plus associated MessageConsumer
541         * running in a separate thread.
542         * @see #scheduleNewInvoker
543         * @see #setTaskExecutor
544         */
545        @Override
546        protected void doInitialize() throws JMSException {
547                synchronized (this.lifecycleMonitor) {
548                        for (int i = 0; i < this.concurrentConsumers; i++) {
549                                scheduleNewInvoker();
550                        }
551                }
552        }
553
554        /**
555         * Destroy the registered JMS Sessions and associated MessageConsumers.
556         */
557        @Override
558        protected void doShutdown() throws JMSException {
559                logger.debug("Waiting for shutdown of message listener invokers");
560                try {
561                        synchronized (this.lifecycleMonitor) {
562                                int waitCount = 0;
563                                while (this.activeInvokerCount > 0) {
564                                        if (logger.isDebugEnabled()) {
565                                                logger.debug("Still waiting for shutdown of " + this.activeInvokerCount +
566                                                                " message listener invokers (iteration " + waitCount + ")");
567                                        }
568                                        // Wait for AsyncMessageListenerInvokers to deactivate themselves...
569                                        long timeout = getReceiveTimeout();
570                                        if (timeout > 0) {
571                                                this.lifecycleMonitor.wait(timeout);
572                                        }
573                                        else {
574                                                this.lifecycleMonitor.wait();
575                                        }
576                                        waitCount++;
577                                }
578                                // Clear remaining scheduled invokers, possibly left over as paused tasks
579                                for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) {
580                                        scheduledInvoker.clearResources();
581                                }
582                                this.scheduledInvokers.clear();
583                        }
584                }
585                catch (InterruptedException ex) {
586                        // Re-interrupt current thread, to allow other threads to react.
587                        Thread.currentThread().interrupt();
588                }
589        }
590
591        /**
592         * Overridden to reset the stop callback, if any.
593         */
594        @Override
595        public void start() throws JmsException {
596                synchronized (this.lifecycleMonitor) {
597                        this.stopCallback = null;
598                }
599                super.start();
600        }
601
602        /**
603         * Stop this listener container, invoking the specific callback
604         * once all listener processing has actually stopped.
605         * <p>Note: Further {@code stop(runnable)} calls (before processing
606         * has actually stopped) will override the specified callback. Only the
607         * latest specified callback will be invoked.
608         * <p>If a subsequent {@link #start()} call restarts the listener container
609         * before it has fully stopped, the callback will not get invoked at all.
610         * @param callback the callback to invoke once listener processing
611         * has fully stopped
612         * @throws JmsException if stopping failed
613         * @see #stop()
614         */
615        @Override
616        public void stop(Runnable callback) throws JmsException {
617                synchronized (this.lifecycleMonitor) {
618                        if (!isRunning() || this.stopCallback != null) {
619                                // Not started, already stopped, or previous stop attempt in progress
620                                // -> return immediately, no stop process to control anymore.
621                                callback.run();
622                                return;
623                        }
624                        this.stopCallback = callback;
625                }
626                stop();
627        }
628
629        /**
630         * Return the number of currently scheduled consumers.
631         * <p>This number will always be between "concurrentConsumers" and
632         * "maxConcurrentConsumers", but might be higher than "activeConsumerCount"
633         * (in case some consumers are scheduled but not executing at the moment).
634         * @see #getConcurrentConsumers()
635         * @see #getMaxConcurrentConsumers()
636         * @see #getActiveConsumerCount()
637         */
638        public final int getScheduledConsumerCount() {
639                synchronized (this.lifecycleMonitor) {
640                        return this.scheduledInvokers.size();
641                }
642        }
643
644        /**
645         * Return the number of currently active consumers.
646         * <p>This number will always be between "concurrentConsumers" and
647         * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount"
648         * (in case some consumers are scheduled but not executing at the moment).
649         * @see #getConcurrentConsumers()
650         * @see #getMaxConcurrentConsumers()
651         * @see #getActiveConsumerCount()
652         */
653        public final int getActiveConsumerCount() {
654                synchronized (this.lifecycleMonitor) {
655                        return this.activeInvokerCount;
656                }
657        }
658
659        /**
660         * Return whether at least one consumer has entered a fixed registration with the
661         * target destination. This is particularly interesting for the pub-sub case where
662         * it might be important to have an actual consumer registered that is guaranteed
663         * not to miss any messages that are just about to be published.
664         * <p>This method may be polled after a {@link #start()} call, until asynchronous
665         * registration of consumers has happened which is when the method will start returning
666         * {@code true} &ndash; provided that the listener container ever actually establishes
667         * a fixed registration. It will then keep returning {@code true} until shutdown,
668         * since the container will hold on to at least one consumer registration thereafter.
669         * <p>Note that a listener container is not bound to having a fixed registration in
670         * the first place. It may also keep recreating consumers for every invoker execution.
671         * This particularly depends on the {@link #setCacheLevel cache level} setting:
672         * only {@link #CACHE_CONSUMER} will lead to a fixed registration.
673         */
674        public boolean isRegisteredWithDestination() {
675                synchronized (this.lifecycleMonitor) {
676                        return (this.registeredWithDestination > 0);
677                }
678        }
679
680
681        /**
682         * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
683         * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
684         * with the specified bean name (or the class name, if no bean name specified) as thread name prefix.
685         * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
686         */
687        protected TaskExecutor createDefaultTaskExecutor() {
688                String beanName = getBeanName();
689                String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
690                return new SimpleAsyncTaskExecutor(threadNamePrefix);
691        }
692
693        /**
694         * Schedule a new invoker, increasing the total number of scheduled
695         * invokers for this listener container.
696         */
697        private void scheduleNewInvoker() {
698                AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
699                if (rescheduleTaskIfNecessary(invoker)) {
700                        // This should always be true, since we're only calling this when active.
701                        this.scheduledInvokers.add(invoker);
702                }
703        }
704
705        /**
706         * Use a shared JMS Connection depending on the "cacheLevel" setting.
707         * @see #setCacheLevel
708         * @see #CACHE_CONNECTION
709         */
710        @Override
711        protected final boolean sharedConnectionEnabled() {
712                return (getCacheLevel() >= CACHE_CONNECTION);
713        }
714
715        /**
716         * Re-executes the given task via this listener container's TaskExecutor.
717         * @see #setTaskExecutor
718         */
719        @Override
720        protected void doRescheduleTask(Object task) {
721                this.taskExecutor.execute((Runnable) task);
722        }
723
724        /**
725         * Tries scheduling a new invoker, since we know messages are coming in...
726         * @see #scheduleNewInvokerIfAppropriate()
727         */
728        @Override
729        protected void messageReceived(Object invoker, Session session) {
730                ((AsyncMessageListenerInvoker) invoker).setIdle(false);
731                scheduleNewInvokerIfAppropriate();
732        }
733
734        /**
735         * Marks the affected invoker as idle.
736         */
737        @Override
738        protected void noMessageReceived(Object invoker, Session session) {
739                ((AsyncMessageListenerInvoker) invoker).setIdle(true);
740        }
741
742        /**
743         * Schedule a new invoker, increasing the total number of scheduled
744         * invokers for this listener container, but only if the specified
745         * "maxConcurrentConsumers" limit has not been reached yet, and only
746         * if the specified "idleConsumerLimit" has not been reached either.
747         * <p>Called once a message has been received, in order to scale up while
748         * processing the message in the invoker that originally received it.
749         * @see #setTaskExecutor
750         * @see #getMaxConcurrentConsumers()
751         * @see #getIdleConsumerLimit()
752         */
753        protected void scheduleNewInvokerIfAppropriate() {
754                if (isRunning()) {
755                        resumePausedTasks();
756                        synchronized (this.lifecycleMonitor) {
757                                if (this.scheduledInvokers.size() < this.maxConcurrentConsumers &&
758                                                getIdleInvokerCount() < this.idleConsumerLimit) {
759                                        scheduleNewInvoker();
760                                        if (logger.isDebugEnabled()) {
761                                                logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size());
762                                        }
763                                }
764                        }
765                }
766        }
767
768        /**
769         * Determine whether the current invoker should be rescheduled,
770         * given that it might not have received a message in a while.
771         * @param idleTaskExecutionCount the number of idle executions
772         * that this invoker task has already accumulated (in a row)
773         */
774        private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) {
775                boolean superfluous =
776                                (idleTaskExecutionCount >= this.idleTaskExecutionLimit && getIdleInvokerCount() > 1);
777                return (this.scheduledInvokers.size() <=
778                                (superfluous ? this.concurrentConsumers : this.maxConcurrentConsumers));
779        }
780
781        /**
782         * Determine whether this listener container currently has more
783         * than one idle instance among its scheduled invokers.
784         */
785        private int getIdleInvokerCount() {
786                int count = 0;
787                for (AsyncMessageListenerInvoker invoker : this.scheduledInvokers) {
788                        if (invoker.isIdle()) {
789                                count++;
790                        }
791                }
792                return count;
793        }
794
795
796        /**
797         * Overridden to accept a failure in the initial setup - leaving it up to the
798         * asynchronous invokers to establish the shared Connection on first access.
799         * @see #refreshConnectionUntilSuccessful()
800         */
801        @Override
802        protected void establishSharedConnection() {
803                try {
804                        super.establishSharedConnection();
805                }
806                catch (Exception ex) {
807                        if (ex instanceof JMSException) {
808                                invokeExceptionListener((JMSException) ex);
809                        }
810                        logger.debug("Could not establish shared JMS Connection - " +
811                                        "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex);
812                }
813        }
814
815        /**
816         * This implementations proceeds even after an exception thrown from
817         * {@code Connection.start()}, relying on listeners to perform
818         * appropriate recovery.
819         */
820        @Override
821        protected void startSharedConnection() {
822                try {
823                        super.startSharedConnection();
824                }
825                catch (Exception ex) {
826                        logger.debug("Connection start failed - relying on listeners to perform recovery", ex);
827                }
828        }
829
830        /**
831         * This implementations proceeds even after an exception thrown from
832         * {@code Connection.stop()}, relying on listeners to perform
833         * appropriate recovery after a restart.
834         */
835        @Override
836        protected void stopSharedConnection() {
837                try {
838                        super.stopSharedConnection();
839                }
840                catch (Exception ex) {
841                        logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex);
842                }
843        }
844
845        /**
846         * Handle the given exception that arose during setup of a listener.
847         * Called for every such exception in every concurrent listener.
848         * <p>The default implementation logs the exception at warn level
849         * if not recovered yet, and at debug level if already recovered.
850         * Can be overridden in subclasses.
851         * @param ex the exception to handle
852         * @param alreadyRecovered whether a previously executing listener
853         * already recovered from the present listener setup failure
854         * (this usually indicates a follow-up failure than can be ignored
855         * other than for debug log purposes)
856         * @see #recoverAfterListenerSetupFailure()
857         */
858        protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) {
859                if (ex instanceof JMSException) {
860                        invokeExceptionListener((JMSException) ex);
861                }
862                if (ex instanceof SharedConnectionNotInitializedException) {
863                        if (!alreadyRecovered) {
864                                logger.info("JMS message listener invoker needs to establish shared Connection");
865                        }
866                }
867                else {
868                        // Recovery during active operation..
869                        if (alreadyRecovered) {
870                                logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex);
871                        }
872                        else {
873                                StringBuilder msg = new StringBuilder();
874                                msg.append("Setup of JMS message listener invoker failed for destination '");
875                                msg.append(getDestinationDescription()).append("' - trying to recover. Cause: ");
876                                msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
877                                if (logger.isDebugEnabled()) {
878                                        logger.warn(msg, ex);
879                                }
880                                else {
881                                        logger.warn(msg);
882                                }
883                        }
884                }
885        }
886
887        /**
888         * Recover this listener container after a listener failed to set itself up,
889         * for example re-establishing the underlying Connection.
890         * <p>The default implementation delegates to DefaultMessageListenerContainer's
891         * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will
892         * try to re-establish a Connection to the JMS provider both for the shared
893         * and the non-shared Connection case.
894         * @see #refreshConnectionUntilSuccessful()
895         * @see #refreshDestination()
896         */
897        protected void recoverAfterListenerSetupFailure() {
898                this.recovering = true;
899                try {
900                        refreshConnectionUntilSuccessful();
901                        refreshDestination();
902                }
903                finally {
904                        this.recovering = false;
905                        this.interrupted = false;
906                }
907        }
908
909        /**
910         * Refresh the underlying Connection, not returning before an attempt has been
911         * successful. Called in case of a shared Connection as well as without shared
912         * Connection, so either needs to operate on the shared Connection or on a
913         * temporary Connection that just gets established for validation purposes.
914         * <p>The default implementation retries until it successfully established a
915         * Connection, for as long as this message listener container is running.
916         * Applies the specified recovery interval between retries.
917         * @see #setRecoveryInterval
918         * @see #start()
919         * @see #stop()
920         */
921        protected void refreshConnectionUntilSuccessful() {
922                BackOffExecution execution = this.backOff.start();
923                while (isRunning()) {
924                        try {
925                                if (sharedConnectionEnabled()) {
926                                        refreshSharedConnection();
927                                }
928                                else {
929                                        Connection con = createConnection();
930                                        JmsUtils.closeConnection(con);
931                                }
932                                logger.info("Successfully refreshed JMS Connection");
933                                break;
934                        }
935                        catch (Exception ex) {
936                                if (ex instanceof JMSException) {
937                                        invokeExceptionListener((JMSException) ex);
938                                }
939                                StringBuilder msg = new StringBuilder();
940                                msg.append("Could not refresh JMS Connection for destination '");
941                                msg.append(getDestinationDescription()).append("' - retrying using ");
942                                msg.append(execution).append(". Cause: ");
943                                msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage());
944                                if (logger.isDebugEnabled()) {
945                                        logger.error(msg, ex);
946                                }
947                                else {
948                                        logger.error(msg);
949                                }
950                        }
951                        if (!applyBackOffTime(execution)) {
952                                StringBuilder msg = new StringBuilder();
953                                msg.append("Stopping container for destination '")
954                                                .append(getDestinationDescription())
955                                                .append("': back-off policy does not allow ").append("for further attempts.");
956                                logger.error(msg.toString());
957                                stop();
958                        }
959                }
960        }
961
962        /**
963         * Refresh the JMS destination that this listener container operates on.
964         * <p>Called after listener setup failure, assuming that a cached Destination
965         * object might have become invalid (a typical case on WebLogic JMS).
966         * <p>The default implementation removes the destination from a
967         * DestinationResolver's cache, in case of a CachingDestinationResolver.
968         * @see #setDestinationName
969         * @see org.springframework.jms.support.destination.CachingDestinationResolver
970         */
971        protected void refreshDestination() {
972                String destName = getDestinationName();
973                if (destName != null) {
974                        DestinationResolver destResolver = getDestinationResolver();
975                        if (destResolver instanceof CachingDestinationResolver) {
976                                ((CachingDestinationResolver) destResolver).removeFromCache(destName);
977                        }
978                }
979        }
980
981        /**
982         * Apply the next back-off time using the specified {@link BackOffExecution}.
983         * <p>Return {@code true} if the back-off period has been applied and a new
984         * attempt to recover should be made, {@code false} if no further attempt
985         * should be made.
986         * @since 4.1
987         */
988        protected boolean applyBackOffTime(BackOffExecution execution) {
989                if (this.recovering && this.interrupted) {
990                        // Interrupted right before and still failing... give up.
991                        return false;
992                }
993                long interval = execution.nextBackOff();
994                if (interval == BackOffExecution.STOP) {
995                        return false;
996                }
997                else {
998                        try {
999                                synchronized (this.lifecycleMonitor) {
1000                                        this.lifecycleMonitor.wait(interval);
1001                                }
1002                        }
1003                        catch (InterruptedException interEx) {
1004                                // Re-interrupt current thread, to allow other threads to react.
1005                                Thread.currentThread().interrupt();
1006                                if (this.recovering) {
1007                                        this.interrupted = true;
1008                                }
1009                        }
1010                        return true;
1011                }
1012        }
1013
1014        /**
1015         * Return whether this listener container is currently in a recovery attempt.
1016         * <p>May be used to detect recovery phases but also the end of a recovery phase,
1017         * with {@code isRecovering()} switching to {@code false} after having been found
1018         * to return {@code true} before.
1019         * @see #recoverAfterListenerSetupFailure()
1020         */
1021        public final boolean isRecovering() {
1022                return this.recovering;
1023        }
1024
1025
1026        //-------------------------------------------------------------------------
1027        // Inner classes used as internal adapters
1028        //-------------------------------------------------------------------------
1029
1030        /**
1031         * Runnable that performs looped {@code MessageConsumer.receive()} calls.
1032         */
1033        private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
1034
1035                private Session session;
1036
1037                private MessageConsumer consumer;
1038
1039                private Object lastRecoveryMarker;
1040
1041                private boolean lastMessageSucceeded;
1042
1043                private int idleTaskExecutionCount = 0;
1044
1045                private volatile boolean idle = true;
1046
1047                @Override
1048                public void run() {
1049                        synchronized (lifecycleMonitor) {
1050                                activeInvokerCount++;
1051                                lifecycleMonitor.notifyAll();
1052                        }
1053                        boolean messageReceived = false;
1054                        try {
1055                                if (maxMessagesPerTask < 0) {
1056                                        messageReceived = executeOngoingLoop();
1057                                }
1058                                else {
1059                                        int messageCount = 0;
1060                                        while (isRunning() && messageCount < maxMessagesPerTask) {
1061                                                messageReceived = (invokeListener() || messageReceived);
1062                                                messageCount++;
1063                                        }
1064                                }
1065                        }
1066                        catch (Throwable ex) {
1067                                clearResources();
1068                                if (!this.lastMessageSucceeded) {
1069                                        // We failed more than once in a row or on startup -
1070                                        // wait before first recovery attempt.
1071                                        waitBeforeRecoveryAttempt();
1072                                }
1073                                this.lastMessageSucceeded = false;
1074                                boolean alreadyRecovered = false;
1075                                synchronized (recoveryMonitor) {
1076                                        if (this.lastRecoveryMarker == currentRecoveryMarker) {
1077                                                handleListenerSetupFailure(ex, false);
1078                                                recoverAfterListenerSetupFailure();
1079                                                currentRecoveryMarker = new Object();
1080                                        }
1081                                        else {
1082                                                alreadyRecovered = true;
1083                                        }
1084                                }
1085                                if (alreadyRecovered) {
1086                                        handleListenerSetupFailure(ex, true);
1087                                }
1088                        }
1089                        finally {
1090                                synchronized (lifecycleMonitor) {
1091                                        decreaseActiveInvokerCount();
1092                                        lifecycleMonitor.notifyAll();
1093                                }
1094                                if (!messageReceived) {
1095                                        this.idleTaskExecutionCount++;
1096                                }
1097                                else {
1098                                        this.idleTaskExecutionCount = 0;
1099                                }
1100                                synchronized (lifecycleMonitor) {
1101                                        if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
1102                                                // We're shutting down completely.
1103                                                scheduledInvokers.remove(this);
1104                                                if (logger.isDebugEnabled()) {
1105                                                        logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
1106                                                }
1107                                                lifecycleMonitor.notifyAll();
1108                                                clearResources();
1109                                        }
1110                                        else if (isRunning()) {
1111                                                int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
1112                                                if (nonPausedConsumers < 1) {
1113                                                        logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
1114                                                                        "Check your thread pool configuration! Manual recovery necessary through a start() call.");
1115                                                }
1116                                                else if (nonPausedConsumers < getConcurrentConsumers()) {
1117                                                        logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
1118                                                                        "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
1119                                                                        "to be triggered by remaining consumers.");
1120                                                }
1121                                        }
1122                                }
1123                        }
1124                }
1125
1126                private boolean executeOngoingLoop() throws JMSException {
1127                        boolean messageReceived = false;
1128                        boolean active = true;
1129                        while (active) {
1130                                synchronized (lifecycleMonitor) {
1131                                        boolean interrupted = false;
1132                                        boolean wasWaiting = false;
1133                                        while ((active = isActive()) && !isRunning()) {
1134                                                if (interrupted) {
1135                                                        throw new IllegalStateException("Thread was interrupted while waiting for " +
1136                                                                        "a restart of the listener container, but container is still stopped");
1137                                                }
1138                                                if (!wasWaiting) {
1139                                                        decreaseActiveInvokerCount();
1140                                                }
1141                                                wasWaiting = true;
1142                                                try {
1143                                                        lifecycleMonitor.wait();
1144                                                }
1145                                                catch (InterruptedException ex) {
1146                                                        // Re-interrupt current thread, to allow other threads to react.
1147                                                        Thread.currentThread().interrupt();
1148                                                        interrupted = true;
1149                                                }
1150                                        }
1151                                        if (wasWaiting) {
1152                                                activeInvokerCount++;
1153                                        }
1154                                        if (scheduledInvokers.size() > maxConcurrentConsumers) {
1155                                                active = false;
1156                                        }
1157                                }
1158                                if (active) {
1159                                        messageReceived = (invokeListener() || messageReceived);
1160                                }
1161                        }
1162                        return messageReceived;
1163                }
1164
1165                private boolean invokeListener() throws JMSException {
1166                        initResourcesIfNecessary();
1167                        boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
1168                        this.lastMessageSucceeded = true;
1169                        return messageReceived;
1170                }
1171
1172                private void decreaseActiveInvokerCount() {
1173                        activeInvokerCount--;
1174                        if (stopCallback != null && activeInvokerCount == 0) {
1175                                stopCallback.run();
1176                                stopCallback = null;
1177                        }
1178                }
1179
1180                private void initResourcesIfNecessary() throws JMSException {
1181                        if (getCacheLevel() <= CACHE_CONNECTION) {
1182                                updateRecoveryMarker();
1183                        }
1184                        else {
1185                                if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
1186                                        updateRecoveryMarker();
1187                                        this.session = createSession(getSharedConnection());
1188                                }
1189                                if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
1190                                        this.consumer = createListenerConsumer(this.session);
1191                                        synchronized (lifecycleMonitor) {
1192                                                registeredWithDestination++;
1193                                        }
1194                                }
1195                        }
1196                }
1197
1198                private void updateRecoveryMarker() {
1199                        synchronized (recoveryMonitor) {
1200                                this.lastRecoveryMarker = currentRecoveryMarker;
1201                        }
1202                }
1203
1204                private void clearResources() {
1205                        if (sharedConnectionEnabled()) {
1206                                synchronized (sharedConnectionMonitor) {
1207                                        JmsUtils.closeMessageConsumer(this.consumer);
1208                                        JmsUtils.closeSession(this.session);
1209                                }
1210                        }
1211                        else {
1212                                JmsUtils.closeMessageConsumer(this.consumer);
1213                                JmsUtils.closeSession(this.session);
1214                        }
1215                        if (this.consumer != null) {
1216                                synchronized (lifecycleMonitor) {
1217                                        registeredWithDestination--;
1218                                }
1219                        }
1220                        this.consumer = null;
1221                        this.session = null;
1222                }
1223
1224                /**
1225                 * Apply the back-off time once. In a regular scenario, the back-off is only applied if we
1226                 * failed to recover with the broker. This additional wait period avoids a burst retry
1227                 * scenario when the broker is actually up but something else if failing (i.e. listener
1228                 * specific).
1229                 */
1230                private void waitBeforeRecoveryAttempt() {
1231                        BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start();
1232                        applyBackOffTime(execution);
1233                }
1234
1235                @Override
1236                public boolean isLongLived() {
1237                        return (maxMessagesPerTask < 0);
1238                }
1239
1240                public void setIdle(boolean idle) {
1241                        this.idle = idle;
1242                }
1243
1244                public boolean isIdle() {
1245                        return this.idle;
1246                }
1247        }
1248
1249}