001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.util.HashSet; 020import java.util.Set; 021import java.util.concurrent.Executor; 022 023import javax.jms.Connection; 024import javax.jms.JMSException; 025import javax.jms.MessageConsumer; 026import javax.jms.Session; 027 028import org.springframework.core.Constants; 029import org.springframework.core.task.SimpleAsyncTaskExecutor; 030import org.springframework.core.task.TaskExecutor; 031import org.springframework.jms.JmsException; 032import org.springframework.jms.support.JmsUtils; 033import org.springframework.jms.support.destination.CachingDestinationResolver; 034import org.springframework.jms.support.destination.DestinationResolver; 035import org.springframework.lang.Nullable; 036import org.springframework.scheduling.SchedulingAwareRunnable; 037import org.springframework.scheduling.SchedulingTaskExecutor; 038import org.springframework.util.Assert; 039import org.springframework.util.ClassUtils; 040import org.springframework.util.backoff.BackOff; 041import org.springframework.util.backoff.BackOffExecution; 042import org.springframework.util.backoff.FixedBackOff; 043 044/** 045 * Message listener container variant that uses plain JMS client APIs, specifically 046 * a loop of {@code MessageConsumer.receive()} calls that also allow for 047 * transactional reception of messages (registering them with XA transactions). 048 * Designed to work in a native JMS environment as well as in a Java EE environment, 049 * with only minimal differences in configuration. 050 * 051 * <p>This is a simple but nevertheless powerful form of message listener container. 052 * On startup, it obtains a fixed number of JMS Sessions to invoke the listener, 053 * and optionally allows for dynamic adaptation at runtime (up to a maximum number). 054 * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level 055 * of runtime complexity, in particular the minimal requirements on the JMS provider: 056 * not even the JMS {@code ServerSessionPool} facility is required. Beyond that, it is 057 * fully self-recovering in case the broker is temporarily unavailable, and allows 058 * for stops/restarts as well as runtime changes to its configuration. 059 * 060 * <p>Actual {@code MessageListener} execution happens in asynchronous work units which are 061 * created through Spring's {@link org.springframework.core.task.TaskExecutor TaskExecutor} 062 * abstraction. By default, the specified number of invoker tasks will be created 063 * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"} 064 * setting. Specify an alternative {@code TaskExecutor} to integrate with an existing 065 * thread pool facility (such as a Java EE server's), for example using a 066 * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}. 067 * With a native JMS setup, each of those listener threads is going to use a 068 * cached JMS {@code Session} and {@code MessageConsumer} (only refreshed in case 069 * of failure), using the JMS provider's resources as efficiently as possible. 070 * 071 * <p>Message reception and listener execution can automatically be wrapped 072 * in transactions by passing a Spring 073 * {@link org.springframework.transaction.PlatformTransactionManager} into the 074 * {@link #setTransactionManager "transactionManager"} property. This will usually 075 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a 076 * Java EE environment, in combination with a JTA-aware JMS {@code ConnectionFactory} 077 * obtained from JNDI (check your Java EE server's documentation). Note that this 078 * listener container will automatically reobtain all JMS handles for each transaction 079 * in case an external transaction manager is specified, for compatibility with 080 * all Java EE servers (in particular JBoss). This non-caching behavior can be 081 * overridden through the {@link #setCacheLevel "cacheLevel"} / 082 * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching of 083 * the {@code Connection} (or also {@code Session} and {@code MessageConsumer}) 084 * even if an external transaction manager is involved. 085 * 086 * <p>Dynamic scaling of the number of concurrent invokers can be activated 087 * by specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} 088 * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"} 089 * value. Since the latter's default is 1, you can also simply specify a 090 * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to 091 * 5 concurrent consumers in case of increasing message load, as well as dynamic 092 * shrinking back to the standard number of consumers once the load decreases. 093 * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"} 094 * setting to control the lifespan of each new task, to avoid frequent scaling up 095 * and down, in particular if the {@code ConnectionFactory} does not pool JMS 096 * {@code Sessions} and/or the {@code TaskExecutor} does not pool threads (check 097 * your configuration!). Note that dynamic scaling only really makes sense for a 098 * queue in the first place; for a topic, you will typically stick with the default 099 * number of 1 consumer, otherwise you'd receive the same message multiple times on 100 * the same node. 101 * 102 * <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory} 103 * in combination with dynamic scaling.</b> Ideally, don't use it with a message 104 * listener container at all, since it is generally preferable to let the 105 * listener container itself handle appropriate caching within its lifecycle. 106 * Also, stopping and restarting a listener container will only work with an 107 * independent, locally cached Connection - not with an externally cached one. 108 * 109 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted 110 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager 111 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer} 112 * javadoc for details on acknowledge modes and native transaction options, as 113 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details 114 * on configuring an external transaction manager. Note that for the default 115 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment 116 * before listener execution, with no redelivery in case of an exception. 117 * 118 * @author Juergen Hoeller 119 * @since 2.0 120 * @see #setTransactionManager 121 * @see #setCacheLevel 122 * @see javax.jms.MessageConsumer#receive(long) 123 * @see SimpleMessageListenerContainer 124 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager 125 */ 126public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer { 127 128 /** 129 * Default thread name prefix: "DefaultMessageListenerContainer-". 130 */ 131 public static final String DEFAULT_THREAD_NAME_PREFIX = 132 ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-"; 133 134 /** 135 * The default recovery interval: 5000 ms = 5 seconds. 136 */ 137 public static final long DEFAULT_RECOVERY_INTERVAL = 5000; 138 139 140 /** 141 * Constant that indicates to cache no JMS resources at all. 142 * @see #setCacheLevel 143 */ 144 public static final int CACHE_NONE = 0; 145 146 /** 147 * Constant that indicates to cache a shared JMS {@code Connection} for each 148 * listener thread. 149 * @see #setCacheLevel 150 */ 151 public static final int CACHE_CONNECTION = 1; 152 153 /** 154 * Constant that indicates to cache a shared JMS {@code Connection} and a JMS 155 * {@code Session} for each listener thread. 156 * @see #setCacheLevel 157 */ 158 public static final int CACHE_SESSION = 2; 159 160 /** 161 * Constant that indicates to cache a shared JMS {@code Connection}, a JMS 162 * {@code Session}, and a JMS MessageConsumer for each listener thread. 163 * @see #setCacheLevel 164 */ 165 public static final int CACHE_CONSUMER = 3; 166 167 /** 168 * Constant that indicates automatic choice of an appropriate caching level 169 * (depending on the transaction management strategy). 170 * @see #setCacheLevel 171 */ 172 public static final int CACHE_AUTO = 4; 173 174 175 private static final Constants constants = new Constants(DefaultMessageListenerContainer.class); 176 177 178 @Nullable 179 private Executor taskExecutor; 180 181 private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE); 182 183 private int cacheLevel = CACHE_AUTO; 184 185 private int concurrentConsumers = 1; 186 187 private int maxConcurrentConsumers = 1; 188 189 private int maxMessagesPerTask = Integer.MIN_VALUE; 190 191 private int idleConsumerLimit = 1; 192 193 private int idleTaskExecutionLimit = 1; 194 195 private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<>(); 196 197 private int activeInvokerCount = 0; 198 199 private int registeredWithDestination = 0; 200 201 private volatile boolean recovering = false; 202 203 private volatile boolean interrupted = false; 204 205 @Nullable 206 private Runnable stopCallback; 207 208 private Object currentRecoveryMarker = new Object(); 209 210 private final Object recoveryMonitor = new Object(); 211 212 213 /** 214 * Set the Spring {@code TaskExecutor} to use for running the listener threads. 215 * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}, 216 * starting up a number of new threads, according to the specified number 217 * of concurrent consumers. 218 * <p>Specify an alternative {@code TaskExecutor} for integration with an existing 219 * thread pool. Note that this really only adds value if the threads are 220 * managed in a specific fashion, for example within a Java EE environment. 221 * A plain thread pool does not add much value, as this listener container 222 * will occupy a number of threads for its entire lifetime. 223 * @see #setConcurrentConsumers 224 * @see org.springframework.core.task.SimpleAsyncTaskExecutor 225 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor 226 */ 227 public void setTaskExecutor(Executor taskExecutor) { 228 this.taskExecutor = taskExecutor; 229 } 230 231 /** 232 * Specify the {@link BackOff} instance to use to compute the interval 233 * between recovery attempts. If the {@link BackOffExecution} implementation 234 * returns {@link BackOffExecution#STOP}, this listener container will not further 235 * attempt to recover. 236 * <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored 237 * when this property is set. 238 * @since 4.1 239 */ 240 public void setBackOff(BackOff backOff) { 241 this.backOff = backOff; 242 } 243 244 /** 245 * Specify the interval between recovery attempts, in <b>milliseconds</b>. 246 * The default is 5000 ms, that is, 5 seconds. This is a convenience method 247 * to create a {@link FixedBackOff} with the specified interval. 248 * <p>For more recovery options, consider specifying a {@link BackOff} 249 * instance instead. 250 * @see #setBackOff(BackOff) 251 * @see #handleListenerSetupFailure 252 */ 253 public void setRecoveryInterval(long recoveryInterval) { 254 this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE); 255 } 256 257 /** 258 * Specify the level of caching that this listener container is allowed to apply, 259 * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION". 260 * @see #setCacheLevel 261 */ 262 public void setCacheLevelName(String constantName) throws IllegalArgumentException { 263 if (!constantName.startsWith("CACHE_")) { 264 throw new IllegalArgumentException("Only cache constants allowed"); 265 } 266 setCacheLevel(constants.asNumber(constantName).intValue()); 267 } 268 269 /** 270 * Specify the level of caching that this listener container is allowed to apply. 271 * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified 272 * (to reobtain all resources freshly within the scope of the external transaction), 273 * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources). 274 * <p>Some Java EE servers only register their JMS resources with an ongoing XA 275 * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session}, 276 * which is why this listener container by default does not cache any of those. 277 * However, depending on the rules of your server with respect to the caching 278 * of transactional resources, consider switching this setting to at least 279 * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an 280 * external transaction manager. 281 * @see #CACHE_NONE 282 * @see #CACHE_CONNECTION 283 * @see #CACHE_SESSION 284 * @see #CACHE_CONSUMER 285 * @see #setCacheLevelName 286 * @see #setTransactionManager 287 */ 288 public void setCacheLevel(int cacheLevel) { 289 this.cacheLevel = cacheLevel; 290 } 291 292 /** 293 * Return the level of caching that this listener container is allowed to apply. 294 */ 295 public int getCacheLevel() { 296 return this.cacheLevel; 297 } 298 299 300 /** 301 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple 302 * upper limit String, e.g. "10" (the lower limit will be 1 in this case). 303 * <p>This listener container will always hold on to the minimum number of consumers 304 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number 305 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load. 306 */ 307 @Override 308 public void setConcurrency(String concurrency) { 309 try { 310 int separatorIndex = concurrency.indexOf('-'); 311 if (separatorIndex != -1) { 312 setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex))); 313 setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1))); 314 } 315 else { 316 setConcurrentConsumers(1); 317 setMaxConcurrentConsumers(Integer.parseInt(concurrency)); 318 } 319 } 320 catch (NumberFormatException ex) { 321 throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + 322 "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported."); 323 } 324 } 325 326 /** 327 * Specify the number of concurrent consumers to create. Default is 1. 328 * <p>Specifying a higher value for this setting will increase the standard 329 * level of scheduled concurrent consumers at runtime: This is effectively 330 * the minimum number of concurrent consumers which will be scheduled 331 * at any given time. This is a static setting; for dynamic scaling, 332 * consider specifying the "maxConcurrentConsumers" setting instead. 333 * <p>Raising the number of concurrent consumers is recommendable in order 334 * to scale the consumption of messages coming in from a queue. However, 335 * note that any ordering guarantees are lost once multiple consumers are 336 * registered. In general, stick with 1 consumer for low-volume queues. 337 * <p><b>Do not raise the number of concurrent consumers for a topic, 338 * unless vendor-specific setup measures clearly allow for it.</b> 339 * With regular setup, this would lead to concurrent consumption 340 * of the same message, which is hardly ever desirable. 341 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 342 * @see #setMaxConcurrentConsumers 343 */ 344 public void setConcurrentConsumers(int concurrentConsumers) { 345 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); 346 synchronized (this.lifecycleMonitor) { 347 this.concurrentConsumers = concurrentConsumers; 348 if (this.maxConcurrentConsumers < concurrentConsumers) { 349 this.maxConcurrentConsumers = concurrentConsumers; 350 } 351 } 352 } 353 354 /** 355 * Return the "concurrentConsumer" setting. 356 * <p>This returns the currently configured "concurrentConsumers" value; 357 * the number of currently scheduled/active consumers might differ. 358 * @see #getScheduledConsumerCount() 359 * @see #getActiveConsumerCount() 360 */ 361 public final int getConcurrentConsumers() { 362 synchronized (this.lifecycleMonitor) { 363 return this.concurrentConsumers; 364 } 365 } 366 367 /** 368 * Specify the maximum number of concurrent consumers to create. Default is 1. 369 * <p>If this setting is higher than "concurrentConsumers", the listener container 370 * will dynamically schedule new consumers at runtime, provided that enough 371 * incoming messages are encountered. Once the load goes down again, the number of 372 * consumers will be reduced to the standard level ("concurrentConsumers") again. 373 * <p>Raising the number of concurrent consumers is recommendable in order 374 * to scale the consumption of messages coming in from a queue. However, 375 * note that any ordering guarantees are lost once multiple consumers are 376 * registered. In general, stick with 1 consumer for low-volume queues. 377 * <p><b>Do not raise the number of concurrent consumers for a topic, 378 * unless vendor-specific setup measures clearly allow for it.</b> 379 * With regular setup, this would lead to concurrent consumption 380 * of the same message, which is hardly ever desirable. 381 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 382 * @see #setConcurrentConsumers 383 */ 384 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 385 Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)"); 386 synchronized (this.lifecycleMonitor) { 387 this.maxConcurrentConsumers = Math.max(maxConcurrentConsumers, this.concurrentConsumers); 388 } 389 } 390 391 /** 392 * Return the "maxConcurrentConsumer" setting. 393 * <p>This returns the currently configured "maxConcurrentConsumers" value; 394 * the number of currently scheduled/active consumers might differ. 395 * @see #getScheduledConsumerCount() 396 * @see #getActiveConsumerCount() 397 */ 398 public final int getMaxConcurrentConsumers() { 399 synchronized (this.lifecycleMonitor) { 400 return this.maxConcurrentConsumers; 401 } 402 } 403 404 /** 405 * Specify the maximum number of messages to process in one task. 406 * More concretely, this limits the number of message reception attempts 407 * per task, which includes receive iterations that did not actually 408 * pick up a message until they hit their timeout (see the 409 * {@link #setReceiveTimeout "receiveTimeout"} property). 410 * <p>Default is unlimited (-1) in case of a standard TaskExecutor, 411 * reusing the original invoker threads until shutdown (at the 412 * expense of limited dynamic scheduling). 413 * <p>In case of a SchedulingTaskExecutor indicating a preference for 414 * short-lived tasks, the default is 10 instead. Specify a number 415 * of 10 to 100 messages to balance between rather long-lived and 416 * rather short-lived tasks here. 417 * <p>Long-lived tasks avoid frequent thread context switches through 418 * sticking with the same thread all the way through, while short-lived 419 * tasks allow thread pools to control the scheduling. Hence, thread 420 * pools will usually prefer short-lived tasks. 421 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 422 * @see #setTaskExecutor 423 * @see #setReceiveTimeout 424 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks() 425 */ 426 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 427 Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0"); 428 synchronized (this.lifecycleMonitor) { 429 this.maxMessagesPerTask = maxMessagesPerTask; 430 } 431 } 432 433 /** 434 * Return the maximum number of messages to process in one task. 435 */ 436 public final int getMaxMessagesPerTask() { 437 synchronized (this.lifecycleMonitor) { 438 return this.maxMessagesPerTask; 439 } 440 } 441 442 /** 443 * Specify the limit for the number of consumers that are allowed to be idle 444 * at any given time. 445 * <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method 446 * to determine if a new invoker should be created. Increasing the limit causes 447 * invokers to be created more aggressively. This can be useful to ramp up the 448 * number of invokers faster. 449 * <p>The default is 1, only scheduling a new invoker (which is likely to 450 * be idle initially) if none of the existing invokers is currently idle. 451 */ 452 public void setIdleConsumerLimit(int idleConsumerLimit) { 453 Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher"); 454 synchronized (this.lifecycleMonitor) { 455 this.idleConsumerLimit = idleConsumerLimit; 456 } 457 } 458 459 /** 460 * Return the limit for the number of idle consumers. 461 */ 462 public final int getIdleConsumerLimit() { 463 synchronized (this.lifecycleMonitor) { 464 return this.idleConsumerLimit; 465 } 466 } 467 468 /** 469 * Specify the limit for idle executions of a consumer task, not having 470 * received any message within its execution. If this limit is reached, 471 * the task will shut down and leave receiving to other executing tasks. 472 * <p>The default is 1, closing idle resources early once a task didn't 473 * receive a message. This applies to dynamic scheduling only; see the 474 * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting. 475 * The minimum number of consumers 476 * (see {@link #setConcurrentConsumers "concurrentConsumers"}) 477 * will be kept around until shutdown in any case. 478 * <p>Within each task execution, a number of message reception attempts 479 * (according to the "maxMessagesPerTask" setting) will each wait for an incoming 480 * message (according to the "receiveTimeout" setting). If all of those receive 481 * attempts in a given task return without a message, the task is considered 482 * idle with respect to received messages. Such a task may still be rescheduled; 483 * however, once it reached the specified "idleTaskExecutionLimit", it will 484 * shut down (in case of dynamic scaling). 485 * <p>Raise this limit if you encounter too frequent scaling up and down. 486 * With this limit being higher, an idle consumer will be kept around longer, 487 * avoiding the restart of a consumer once a new load of messages comes in. 488 * Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value, 489 * which will also lead to idle consumers being kept around for a longer time 490 * (while also increasing the average execution time of each scheduled task). 491 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 492 * @see #setMaxMessagesPerTask 493 * @see #setReceiveTimeout 494 */ 495 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 496 Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher"); 497 synchronized (this.lifecycleMonitor) { 498 this.idleTaskExecutionLimit = idleTaskExecutionLimit; 499 } 500 } 501 502 /** 503 * Return the limit for idle executions of a consumer task. 504 */ 505 public final int getIdleTaskExecutionLimit() { 506 synchronized (this.lifecycleMonitor) { 507 return this.idleTaskExecutionLimit; 508 } 509 } 510 511 512 //------------------------------------------------------------------------- 513 // Implementation of AbstractMessageListenerContainer's template methods 514 //------------------------------------------------------------------------- 515 516 @Override 517 public void initialize() { 518 // Adapt default cache level. 519 if (this.cacheLevel == CACHE_AUTO) { 520 this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER); 521 } 522 523 // Prepare taskExecutor and maxMessagesPerTask. 524 synchronized (this.lifecycleMonitor) { 525 if (this.taskExecutor == null) { 526 this.taskExecutor = createDefaultTaskExecutor(); 527 } 528 else if (this.taskExecutor instanceof SchedulingTaskExecutor && 529 ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() && 530 this.maxMessagesPerTask == Integer.MIN_VALUE) { 531 // TaskExecutor indicated a preference for short-lived tasks. According to 532 // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case 533 // unless the user specified a custom value. 534 this.maxMessagesPerTask = 10; 535 } 536 } 537 538 // Proceed with actual listener initialization. 539 super.initialize(); 540 } 541 542 /** 543 * Creates the specified number of concurrent consumers, 544 * in the form of a JMS Session plus associated MessageConsumer 545 * running in a separate thread. 546 * @see #scheduleNewInvoker 547 * @see #setTaskExecutor 548 */ 549 @Override 550 protected void doInitialize() throws JMSException { 551 synchronized (this.lifecycleMonitor) { 552 for (int i = 0; i < this.concurrentConsumers; i++) { 553 scheduleNewInvoker(); 554 } 555 } 556 } 557 558 /** 559 * Destroy the registered JMS Sessions and associated MessageConsumers. 560 */ 561 @Override 562 protected void doShutdown() throws JMSException { 563 logger.debug("Waiting for shutdown of message listener invokers"); 564 try { 565 synchronized (this.lifecycleMonitor) { 566 long receiveTimeout = getReceiveTimeout(); 567 long waitStartTime = System.currentTimeMillis(); 568 int waitCount = 0; 569 while (this.activeInvokerCount > 0) { 570 if (waitCount > 0 && !isAcceptMessagesWhileStopping() && 571 System.currentTimeMillis() - waitStartTime >= receiveTimeout) { 572 // Unexpectedly some invokers are still active after the receive timeout period 573 // -> interrupt remaining receive attempts since we'd reject the messages anyway 574 for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { 575 scheduledInvoker.interruptIfNecessary(); 576 } 577 } 578 if (logger.isDebugEnabled()) { 579 logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + 580 " message listener invokers (iteration " + waitCount + ")"); 581 } 582 // Wait for AsyncMessageListenerInvokers to deactivate themselves... 583 if (receiveTimeout > 0) { 584 this.lifecycleMonitor.wait(receiveTimeout); 585 } 586 else { 587 this.lifecycleMonitor.wait(); 588 } 589 waitCount++; 590 } 591 // Clear remaining scheduled invokers, possibly left over as paused tasks 592 for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { 593 scheduledInvoker.clearResources(); 594 } 595 this.scheduledInvokers.clear(); 596 } 597 } 598 catch (InterruptedException ex) { 599 // Re-interrupt current thread, to allow other threads to react. 600 Thread.currentThread().interrupt(); 601 } 602 } 603 604 /** 605 * Overridden to reset the stop callback, if any. 606 */ 607 @Override 608 public void start() throws JmsException { 609 synchronized (this.lifecycleMonitor) { 610 this.stopCallback = null; 611 } 612 super.start(); 613 } 614 615 /** 616 * Stop this listener container, invoking the specific callback 617 * once all listener processing has actually stopped. 618 * <p>Note: Further {@code stop(runnable)} calls (before processing 619 * has actually stopped) will override the specified callback. Only the 620 * latest specified callback will be invoked. 621 * <p>If a subsequent {@link #start()} call restarts the listener container 622 * before it has fully stopped, the callback will not get invoked at all. 623 * @param callback the callback to invoke once listener processing 624 * has fully stopped 625 * @throws JmsException if stopping failed 626 * @see #stop() 627 */ 628 @Override 629 public void stop(Runnable callback) throws JmsException { 630 synchronized (this.lifecycleMonitor) { 631 if (!isRunning() || this.stopCallback != null) { 632 // Not started, already stopped, or previous stop attempt in progress 633 // -> return immediately, no stop process to control anymore. 634 callback.run(); 635 return; 636 } 637 this.stopCallback = callback; 638 } 639 stop(); 640 } 641 642 /** 643 * Return the number of currently scheduled consumers. 644 * <p>This number will always be between "concurrentConsumers" and 645 * "maxConcurrentConsumers", but might be higher than "activeConsumerCount" 646 * (in case some consumers are scheduled but not executing at the moment). 647 * @see #getConcurrentConsumers() 648 * @see #getMaxConcurrentConsumers() 649 * @see #getActiveConsumerCount() 650 */ 651 public final int getScheduledConsumerCount() { 652 synchronized (this.lifecycleMonitor) { 653 return this.scheduledInvokers.size(); 654 } 655 } 656 657 /** 658 * Return the number of currently active consumers. 659 * <p>This number will always be between "concurrentConsumers" and 660 * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount" 661 * (in case some consumers are scheduled but not executing at the moment). 662 * @see #getConcurrentConsumers() 663 * @see #getMaxConcurrentConsumers() 664 * @see #getActiveConsumerCount() 665 */ 666 public final int getActiveConsumerCount() { 667 synchronized (this.lifecycleMonitor) { 668 return this.activeInvokerCount; 669 } 670 } 671 672 /** 673 * Return whether at least one consumer has entered a fixed registration with the 674 * target destination. This is particularly interesting for the pub-sub case where 675 * it might be important to have an actual consumer registered that is guaranteed 676 * not to miss any messages that are just about to be published. 677 * <p>This method may be polled after a {@link #start()} call, until asynchronous 678 * registration of consumers has happened which is when the method will start returning 679 * {@code true} – provided that the listener container ever actually establishes 680 * a fixed registration. It will then keep returning {@code true} until shutdown, 681 * since the container will hold on to at least one consumer registration thereafter. 682 * <p>Note that a listener container is not bound to having a fixed registration in 683 * the first place. It may also keep recreating consumers for every invoker execution. 684 * This particularly depends on the {@link #setCacheLevel cache level} setting: 685 * only {@link #CACHE_CONSUMER} will lead to a fixed registration. 686 */ 687 public boolean isRegisteredWithDestination() { 688 synchronized (this.lifecycleMonitor) { 689 return (this.registeredWithDestination > 0); 690 } 691 } 692 693 694 /** 695 * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified. 696 * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} 697 * with the specified bean name (or the class name, if no bean name specified) as thread name prefix. 698 * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) 699 */ 700 protected TaskExecutor createDefaultTaskExecutor() { 701 String beanName = getBeanName(); 702 String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX); 703 return new SimpleAsyncTaskExecutor(threadNamePrefix); 704 } 705 706 /** 707 * Schedule a new invoker, increasing the total number of scheduled 708 * invokers for this listener container. 709 */ 710 private void scheduleNewInvoker() { 711 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 712 if (rescheduleTaskIfNecessary(invoker)) { 713 // This should always be true, since we're only calling this when active. 714 this.scheduledInvokers.add(invoker); 715 } 716 } 717 718 /** 719 * Use a shared JMS Connection depending on the "cacheLevel" setting. 720 * @see #setCacheLevel 721 * @see #CACHE_CONNECTION 722 */ 723 @Override 724 protected final boolean sharedConnectionEnabled() { 725 return (getCacheLevel() >= CACHE_CONNECTION); 726 } 727 728 /** 729 * Re-executes the given task via this listener container's TaskExecutor. 730 * @see #setTaskExecutor 731 */ 732 @Override 733 protected void doRescheduleTask(Object task) { 734 Assert.state(this.taskExecutor != null, "No TaskExecutor available"); 735 this.taskExecutor.execute((Runnable) task); 736 } 737 738 /** 739 * Tries scheduling a new invoker, since we know messages are coming in... 740 * @see #scheduleNewInvokerIfAppropriate() 741 */ 742 @Override 743 protected void messageReceived(Object invoker, Session session) { 744 ((AsyncMessageListenerInvoker) invoker).setIdle(false); 745 scheduleNewInvokerIfAppropriate(); 746 } 747 748 /** 749 * Marks the affected invoker as idle. 750 */ 751 @Override 752 protected void noMessageReceived(Object invoker, Session session) { 753 ((AsyncMessageListenerInvoker) invoker).setIdle(true); 754 } 755 756 /** 757 * Schedule a new invoker, increasing the total number of scheduled 758 * invokers for this listener container, but only if the specified 759 * "maxConcurrentConsumers" limit has not been reached yet, and only 760 * if the specified "idleConsumerLimit" has not been reached either. 761 * <p>Called once a message has been received, in order to scale up while 762 * processing the message in the invoker that originally received it. 763 * @see #setTaskExecutor 764 * @see #getMaxConcurrentConsumers() 765 * @see #getIdleConsumerLimit() 766 */ 767 protected void scheduleNewInvokerIfAppropriate() { 768 if (isRunning()) { 769 resumePausedTasks(); 770 synchronized (this.lifecycleMonitor) { 771 if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && 772 getIdleInvokerCount() < this.idleConsumerLimit) { 773 scheduleNewInvoker(); 774 if (logger.isDebugEnabled()) { 775 logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size()); 776 } 777 } 778 } 779 } 780 } 781 782 /** 783 * Determine whether the current invoker should be rescheduled, 784 * given that it might not have received a message in a while. 785 * @param idleTaskExecutionCount the number of idle executions 786 * that this invoker task has already accumulated (in a row) 787 */ 788 private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) { 789 boolean superfluous = 790 (idleTaskExecutionCount >= this.idleTaskExecutionLimit && getIdleInvokerCount() > 1); 791 return (this.scheduledInvokers.size() <= 792 (superfluous ? this.concurrentConsumers : this.maxConcurrentConsumers)); 793 } 794 795 /** 796 * Determine whether this listener container currently has more 797 * than one idle instance among its scheduled invokers. 798 */ 799 private int getIdleInvokerCount() { 800 int count = 0; 801 for (AsyncMessageListenerInvoker invoker : this.scheduledInvokers) { 802 if (invoker.isIdle()) { 803 count++; 804 } 805 } 806 return count; 807 } 808 809 810 /** 811 * Overridden to accept a failure in the initial setup - leaving it up to the 812 * asynchronous invokers to establish the shared Connection on first access. 813 * @see #refreshConnectionUntilSuccessful() 814 */ 815 @Override 816 protected void establishSharedConnection() { 817 try { 818 super.establishSharedConnection(); 819 } 820 catch (Exception ex) { 821 if (ex instanceof JMSException) { 822 invokeExceptionListener((JMSException) ex); 823 } 824 logger.debug("Could not establish shared JMS Connection - " + 825 "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex); 826 } 827 } 828 829 /** 830 * This implementations proceeds even after an exception thrown from 831 * {@code Connection.start()}, relying on listeners to perform 832 * appropriate recovery. 833 */ 834 @Override 835 protected void startSharedConnection() { 836 try { 837 super.startSharedConnection(); 838 } 839 catch (Exception ex) { 840 logger.debug("Connection start failed - relying on listeners to perform recovery", ex); 841 } 842 } 843 844 /** 845 * This implementations proceeds even after an exception thrown from 846 * {@code Connection.stop()}, relying on listeners to perform 847 * appropriate recovery after a restart. 848 */ 849 @Override 850 protected void stopSharedConnection() { 851 try { 852 super.stopSharedConnection(); 853 } 854 catch (Exception ex) { 855 logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex); 856 } 857 } 858 859 /** 860 * Handle the given exception that arose during setup of a listener. 861 * Called for every such exception in every concurrent listener. 862 * <p>The default implementation logs the exception at warn level 863 * if not recovered yet, and at debug level if already recovered. 864 * Can be overridden in subclasses. 865 * @param ex the exception to handle 866 * @param alreadyRecovered whether a previously executing listener 867 * already recovered from the present listener setup failure 868 * (this usually indicates a follow-up failure than can be ignored 869 * other than for debug log purposes) 870 * @see #recoverAfterListenerSetupFailure() 871 */ 872 protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) { 873 if (ex instanceof JMSException) { 874 invokeExceptionListener((JMSException) ex); 875 } 876 if (ex instanceof SharedConnectionNotInitializedException) { 877 if (!alreadyRecovered) { 878 logger.debug("JMS message listener invoker needs to establish shared Connection"); 879 } 880 } 881 else { 882 // Recovery during active operation.. 883 if (alreadyRecovered) { 884 logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex); 885 } 886 else { 887 StringBuilder msg = new StringBuilder(); 888 msg.append("Setup of JMS message listener invoker failed for destination '"); 889 msg.append(getDestinationDescription()).append("' - trying to recover. Cause: "); 890 msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage()); 891 if (logger.isDebugEnabled()) { 892 logger.warn(msg, ex); 893 } 894 else { 895 logger.warn(msg); 896 } 897 } 898 } 899 } 900 901 /** 902 * Recover this listener container after a listener failed to set itself up, 903 * for example re-establishing the underlying Connection. 904 * <p>The default implementation delegates to DefaultMessageListenerContainer's 905 * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will 906 * try to re-establish a Connection to the JMS provider both for the shared 907 * and the non-shared Connection case. 908 * @see #refreshConnectionUntilSuccessful() 909 * @see #refreshDestination() 910 */ 911 protected void recoverAfterListenerSetupFailure() { 912 this.recovering = true; 913 try { 914 refreshConnectionUntilSuccessful(); 915 refreshDestination(); 916 } 917 finally { 918 this.recovering = false; 919 this.interrupted = false; 920 } 921 } 922 923 /** 924 * Refresh the underlying Connection, not returning before an attempt has been 925 * successful. Called in case of a shared Connection as well as without shared 926 * Connection, so either needs to operate on the shared Connection or on a 927 * temporary Connection that just gets established for validation purposes. 928 * <p>The default implementation retries until it successfully established a 929 * Connection, for as long as this message listener container is running. 930 * Applies the specified recovery interval between retries. 931 * @see #setRecoveryInterval 932 * @see #start() 933 * @see #stop() 934 */ 935 protected void refreshConnectionUntilSuccessful() { 936 BackOffExecution execution = this.backOff.start(); 937 while (isRunning()) { 938 try { 939 if (sharedConnectionEnabled()) { 940 refreshSharedConnection(); 941 } 942 else { 943 Connection con = createConnection(); 944 JmsUtils.closeConnection(con); 945 } 946 logger.debug("Successfully refreshed JMS Connection"); 947 break; 948 } 949 catch (Exception ex) { 950 if (ex instanceof JMSException) { 951 invokeExceptionListener((JMSException) ex); 952 } 953 StringBuilder msg = new StringBuilder(); 954 msg.append("Could not refresh JMS Connection for destination '"); 955 msg.append(getDestinationDescription()).append("' - retrying using "); 956 msg.append(execution).append(". Cause: "); 957 msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage()); 958 if (logger.isDebugEnabled()) { 959 logger.error(msg, ex); 960 } 961 else { 962 logger.error(msg); 963 } 964 } 965 if (!applyBackOffTime(execution)) { 966 StringBuilder msg = new StringBuilder(); 967 msg.append("Stopping container for destination '") 968 .append(getDestinationDescription()) 969 .append("': back-off policy does not allow ").append("for further attempts."); 970 logger.error(msg.toString()); 971 stop(); 972 } 973 } 974 } 975 976 /** 977 * Refresh the JMS destination that this listener container operates on. 978 * <p>Called after listener setup failure, assuming that a cached Destination 979 * object might have become invalid (a typical case on WebLogic JMS). 980 * <p>The default implementation removes the destination from a 981 * DestinationResolver's cache, in case of a CachingDestinationResolver. 982 * @see #setDestinationName 983 * @see org.springframework.jms.support.destination.CachingDestinationResolver 984 */ 985 protected void refreshDestination() { 986 String destName = getDestinationName(); 987 if (destName != null) { 988 DestinationResolver destResolver = getDestinationResolver(); 989 if (destResolver instanceof CachingDestinationResolver) { 990 ((CachingDestinationResolver) destResolver).removeFromCache(destName); 991 } 992 } 993 } 994 995 /** 996 * Apply the next back-off time using the specified {@link BackOffExecution}. 997 * <p>Return {@code true} if the back-off period has been applied and a new 998 * attempt to recover should be made, {@code false} if no further attempt 999 * should be made. 1000 * @since 4.1 1001 */ 1002 protected boolean applyBackOffTime(BackOffExecution execution) { 1003 if (this.recovering && this.interrupted) { 1004 // Interrupted right before and still failing... give up. 1005 return false; 1006 } 1007 long interval = execution.nextBackOff(); 1008 if (interval == BackOffExecution.STOP) { 1009 return false; 1010 } 1011 else { 1012 try { 1013 synchronized (this.lifecycleMonitor) { 1014 this.lifecycleMonitor.wait(interval); 1015 } 1016 } 1017 catch (InterruptedException interEx) { 1018 // Re-interrupt current thread, to allow other threads to react. 1019 Thread.currentThread().interrupt(); 1020 if (this.recovering) { 1021 this.interrupted = true; 1022 } 1023 } 1024 return true; 1025 } 1026 } 1027 1028 /** 1029 * Return whether this listener container is currently in a recovery attempt. 1030 * <p>May be used to detect recovery phases but also the end of a recovery phase, 1031 * with {@code isRecovering()} switching to {@code false} after having been found 1032 * to return {@code true} before. 1033 * @see #recoverAfterListenerSetupFailure() 1034 */ 1035 public final boolean isRecovering() { 1036 return this.recovering; 1037 } 1038 1039 1040 //------------------------------------------------------------------------- 1041 // Inner classes used as internal adapters 1042 //------------------------------------------------------------------------- 1043 1044 /** 1045 * Runnable that performs looped {@code MessageConsumer.receive()} calls. 1046 */ 1047 private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable { 1048 1049 @Nullable 1050 private Session session; 1051 1052 @Nullable 1053 private MessageConsumer consumer; 1054 1055 @Nullable 1056 private Object lastRecoveryMarker; 1057 1058 private boolean lastMessageSucceeded; 1059 1060 private int idleTaskExecutionCount = 0; 1061 1062 private volatile boolean idle = true; 1063 1064 @Nullable 1065 private volatile Thread currentReceiveThread; 1066 1067 @Override 1068 public void run() { 1069 synchronized (lifecycleMonitor) { 1070 activeInvokerCount++; 1071 lifecycleMonitor.notifyAll(); 1072 } 1073 boolean messageReceived = false; 1074 try { 1075 if (maxMessagesPerTask < 0) { 1076 messageReceived = executeOngoingLoop(); 1077 } 1078 else { 1079 int messageCount = 0; 1080 while (isRunning() && messageCount < maxMessagesPerTask) { 1081 messageReceived = (invokeListener() || messageReceived); 1082 messageCount++; 1083 } 1084 } 1085 } 1086 catch (Throwable ex) { 1087 clearResources(); 1088 if (!this.lastMessageSucceeded) { 1089 // We failed more than once in a row or on startup - 1090 // wait before first recovery attempt. 1091 waitBeforeRecoveryAttempt(); 1092 } 1093 this.lastMessageSucceeded = false; 1094 boolean alreadyRecovered = false; 1095 synchronized (recoveryMonitor) { 1096 if (this.lastRecoveryMarker == currentRecoveryMarker) { 1097 handleListenerSetupFailure(ex, false); 1098 recoverAfterListenerSetupFailure(); 1099 currentRecoveryMarker = new Object(); 1100 } 1101 else { 1102 alreadyRecovered = true; 1103 } 1104 } 1105 if (alreadyRecovered) { 1106 handleListenerSetupFailure(ex, true); 1107 } 1108 } 1109 finally { 1110 synchronized (lifecycleMonitor) { 1111 decreaseActiveInvokerCount(); 1112 lifecycleMonitor.notifyAll(); 1113 } 1114 if (!messageReceived) { 1115 this.idleTaskExecutionCount++; 1116 } 1117 else { 1118 this.idleTaskExecutionCount = 0; 1119 } 1120 synchronized (lifecycleMonitor) { 1121 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { 1122 // We're shutting down completely. 1123 scheduledInvokers.remove(this); 1124 if (logger.isDebugEnabled()) { 1125 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); 1126 } 1127 lifecycleMonitor.notifyAll(); 1128 clearResources(); 1129 } 1130 else if (isRunning()) { 1131 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); 1132 if (nonPausedConsumers < 1) { 1133 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + 1134 "Check your thread pool configuration! Manual recovery necessary through a start() call."); 1135 } 1136 else if (nonPausedConsumers < getConcurrentConsumers()) { 1137 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + 1138 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + 1139 "to be triggered by remaining consumers."); 1140 } 1141 } 1142 } 1143 } 1144 } 1145 1146 private boolean executeOngoingLoop() throws JMSException { 1147 boolean messageReceived = false; 1148 boolean active = true; 1149 while (active) { 1150 synchronized (lifecycleMonitor) { 1151 boolean interrupted = false; 1152 boolean wasWaiting = false; 1153 while ((active = isActive()) && !isRunning()) { 1154 if (interrupted) { 1155 throw new IllegalStateException("Thread was interrupted while waiting for " + 1156 "a restart of the listener container, but container is still stopped"); 1157 } 1158 if (!wasWaiting) { 1159 decreaseActiveInvokerCount(); 1160 } 1161 wasWaiting = true; 1162 try { 1163 lifecycleMonitor.wait(); 1164 } 1165 catch (InterruptedException ex) { 1166 // Re-interrupt current thread, to allow other threads to react. 1167 Thread.currentThread().interrupt(); 1168 interrupted = true; 1169 } 1170 } 1171 if (wasWaiting) { 1172 activeInvokerCount++; 1173 } 1174 if (scheduledInvokers.size() > maxConcurrentConsumers) { 1175 active = false; 1176 } 1177 } 1178 if (active) { 1179 messageReceived = (invokeListener() || messageReceived); 1180 } 1181 } 1182 return messageReceived; 1183 } 1184 1185 private boolean invokeListener() throws JMSException { 1186 this.currentReceiveThread = Thread.currentThread(); 1187 try { 1188 initResourcesIfNecessary(); 1189 boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); 1190 this.lastMessageSucceeded = true; 1191 return messageReceived; 1192 } 1193 finally { 1194 this.currentReceiveThread = null; 1195 } 1196 } 1197 1198 private void decreaseActiveInvokerCount() { 1199 activeInvokerCount--; 1200 if (stopCallback != null && activeInvokerCount == 0) { 1201 stopCallback.run(); 1202 stopCallback = null; 1203 } 1204 } 1205 1206 private void initResourcesIfNecessary() throws JMSException { 1207 if (getCacheLevel() <= CACHE_CONNECTION) { 1208 updateRecoveryMarker(); 1209 } 1210 else { 1211 if (this.session == null && getCacheLevel() >= CACHE_SESSION) { 1212 updateRecoveryMarker(); 1213 this.session = createSession(getSharedConnection()); 1214 } 1215 if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { 1216 this.consumer = createListenerConsumer(this.session); 1217 synchronized (lifecycleMonitor) { 1218 registeredWithDestination++; 1219 } 1220 } 1221 } 1222 } 1223 1224 private void updateRecoveryMarker() { 1225 synchronized (recoveryMonitor) { 1226 this.lastRecoveryMarker = currentRecoveryMarker; 1227 } 1228 } 1229 1230 private void interruptIfNecessary() { 1231 Thread currentReceiveThread = this.currentReceiveThread; 1232 if (currentReceiveThread != null && !currentReceiveThread.isInterrupted()) { 1233 currentReceiveThread.interrupt(); 1234 } 1235 } 1236 1237 private void clearResources() { 1238 if (sharedConnectionEnabled()) { 1239 synchronized (sharedConnectionMonitor) { 1240 JmsUtils.closeMessageConsumer(this.consumer); 1241 JmsUtils.closeSession(this.session); 1242 } 1243 } 1244 else { 1245 JmsUtils.closeMessageConsumer(this.consumer); 1246 JmsUtils.closeSession(this.session); 1247 } 1248 if (this.consumer != null) { 1249 synchronized (lifecycleMonitor) { 1250 registeredWithDestination--; 1251 } 1252 } 1253 this.consumer = null; 1254 this.session = null; 1255 } 1256 1257 /** 1258 * Apply the back-off time once. In a regular scenario, the back-off is only applied if we 1259 * failed to recover with the broker. This additional wait period avoids a burst retry 1260 * scenario when the broker is actually up but something else if failing (i.e. listener 1261 * specific). 1262 */ 1263 private void waitBeforeRecoveryAttempt() { 1264 BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start(); 1265 applyBackOffTime(execution); 1266 } 1267 1268 @Override 1269 public boolean isLongLived() { 1270 return (maxMessagesPerTask < 0); 1271 } 1272 1273 public void setIdle(boolean idle) { 1274 this.idle = idle; 1275 } 1276 1277 public boolean isIdle() { 1278 return this.idle; 1279 } 1280 } 1281 1282}