001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.util.HashSet; 020import java.util.Set; 021import java.util.concurrent.Executor; 022import javax.jms.Connection; 023import javax.jms.JMSException; 024import javax.jms.MessageConsumer; 025import javax.jms.Session; 026 027import org.springframework.core.Constants; 028import org.springframework.core.task.SimpleAsyncTaskExecutor; 029import org.springframework.core.task.TaskExecutor; 030import org.springframework.jms.JmsException; 031import org.springframework.jms.support.JmsUtils; 032import org.springframework.jms.support.destination.CachingDestinationResolver; 033import org.springframework.jms.support.destination.DestinationResolver; 034import org.springframework.scheduling.SchedulingAwareRunnable; 035import org.springframework.scheduling.SchedulingTaskExecutor; 036import org.springframework.util.Assert; 037import org.springframework.util.ClassUtils; 038import org.springframework.util.backoff.BackOff; 039import org.springframework.util.backoff.BackOffExecution; 040import org.springframework.util.backoff.FixedBackOff; 041 042/** 043 * Message listener container variant that uses plain JMS client APIs, specifically 044 * a loop of {@code MessageConsumer.receive()} calls that also allow for 045 * transactional reception of messages (registering them with XA transactions). 046 * Designed to work in a native JMS environment as well as in a Java EE environment, 047 * with only minimal differences in configuration. 048 * 049 * <p>This is a simple but nevertheless powerful form of message listener container. 050 * On startup, it obtains a fixed number of JMS Sessions to invoke the listener, 051 * and optionally allows for dynamic adaptation at runtime (up to a maximum number). 052 * Like {@link SimpleMessageListenerContainer}, its main advantage is its low level 053 * of runtime complexity, in particular the minimal requirements on the JMS provider: 054 * not even the JMS {@code ServerSessionPool} facility is required. Beyond that, it is 055 * fully self-recovering in case the broker is temporarily unavailable, and allows 056 * for stops/restarts as well as runtime changes to its configuration. 057 * 058 * <p>Actual {@code MessageListener} execution happens in asynchronous work units which are 059 * created through Spring's {@link org.springframework.core.task.TaskExecutor TaskExecutor} 060 * abstraction. By default, the specified number of invoker tasks will be created 061 * on startup, according to the {@link #setConcurrentConsumers "concurrentConsumers"} 062 * setting. Specify an alternative {@code TaskExecutor} to integrate with an existing 063 * thread pool facility (such as a Java EE server's), for example using a 064 * {@link org.springframework.scheduling.commonj.WorkManagerTaskExecutor CommonJ WorkManager}. 065 * With a native JMS setup, each of those listener threads is going to use a 066 * cached JMS {@code Session} and {@code MessageConsumer} (only refreshed in case 067 * of failure), using the JMS provider's resources as efficiently as possible. 068 * 069 * <p>Message reception and listener execution can automatically be wrapped 070 * in transactions by passing a Spring 071 * {@link org.springframework.transaction.PlatformTransactionManager} into the 072 * {@link #setTransactionManager "transactionManager"} property. This will usually 073 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a 074 * Java EE environment, in combination with a JTA-aware JMS {@code ConnectionFactory} 075 * obtained from JNDI (check your Java EE server's documentation). Note that this 076 * listener container will automatically reobtain all JMS handles for each transaction 077 * in case an external transaction manager is specified, for compatibility with 078 * all Java EE servers (in particular JBoss). This non-caching behavior can be 079 * overridden through the {@link #setCacheLevel "cacheLevel"} / 080 * {@link #setCacheLevelName "cacheLevelName"} property, enforcing caching of 081 * the {@code Connection} (or also {@code Session} and {@code MessageConsumer}) 082 * even if an external transaction manager is involved. 083 * 084 * <p>Dynamic scaling of the number of concurrent invokers can be activated 085 * by specifying a {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} 086 * value that is higher than the {@link #setConcurrentConsumers "concurrentConsumers"} 087 * value. Since the latter's default is 1, you can also simply specify a 088 * "maxConcurrentConsumers" of e.g. 5, which will lead to dynamic scaling up to 089 * 5 concurrent consumers in case of increasing message load, as well as dynamic 090 * shrinking back to the standard number of consumers once the load decreases. 091 * Consider adapting the {@link #setIdleTaskExecutionLimit "idleTaskExecutionLimit"} 092 * setting to control the lifespan of each new task, to avoid frequent scaling up 093 * and down, in particular if the {@code ConnectionFactory} does not pool JMS 094 * {@code Sessions} and/or the {@code TaskExecutor} does not pool threads (check 095 * your configuration!). Note that dynamic scaling only really makes sense for a 096 * queue in the first place; for a topic, you will typically stick with the default 097 * number of 1 consumer, otherwise you'd receive the same message multiple times on 098 * the same node. 099 * 100 * <p><b>Note: Don't use Spring's {@link org.springframework.jms.connection.CachingConnectionFactory} 101 * in combination with dynamic scaling.</b> Ideally, don't use it with a message 102 * listener container at all, since it is generally preferable to let the 103 * listener container itself handle appropriate caching within its lifecycle. 104 * Also, stopping and restarting a listener container will only work with an 105 * independent, locally cached Connection - not with an externally cached one. 106 * 107 * <p><b>It is strongly recommended to either set {@link #setSessionTransacted 108 * "sessionTransacted"} to "true" or specify an external {@link #setTransactionManager 109 * "transactionManager"}.</b> See the {@link AbstractMessageListenerContainer} 110 * javadoc for details on acknowledge modes and native transaction options, as 111 * well as the {@link AbstractPollingMessageListenerContainer} javadoc for details 112 * on configuring an external transaction manager. Note that for the default 113 * "AUTO_ACKNOWLEDGE" mode, this container applies automatic message acknowledgment 114 * before listener execution, with no redelivery in case of an exception. 115 * 116 * @author Juergen Hoeller 117 * @since 2.0 118 * @see #setTransactionManager 119 * @see #setCacheLevel 120 * @see javax.jms.MessageConsumer#receive(long) 121 * @see SimpleMessageListenerContainer 122 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager 123 */ 124public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer { 125 126 /** 127 * Default thread name prefix: "DefaultMessageListenerContainer-". 128 */ 129 public static final String DEFAULT_THREAD_NAME_PREFIX = 130 ClassUtils.getShortName(DefaultMessageListenerContainer.class) + "-"; 131 132 /** 133 * The default recovery interval: 5000 ms = 5 seconds. 134 */ 135 public static final long DEFAULT_RECOVERY_INTERVAL = 5000; 136 137 138 /** 139 * Constant that indicates to cache no JMS resources at all. 140 * @see #setCacheLevel 141 */ 142 public static final int CACHE_NONE = 0; 143 144 /** 145 * Constant that indicates to cache a shared JMS {@code Connection} for each 146 * listener thread. 147 * @see #setCacheLevel 148 */ 149 public static final int CACHE_CONNECTION = 1; 150 151 /** 152 * Constant that indicates to cache a shared JMS {@code Connection} and a JMS 153 * {@code Session} for each listener thread. 154 * @see #setCacheLevel 155 */ 156 public static final int CACHE_SESSION = 2; 157 158 /** 159 * Constant that indicates to cache a shared JMS {@code Connection}, a JMS 160 * {@code Session}, and a JMS MessageConsumer for each listener thread. 161 * @see #setCacheLevel 162 */ 163 public static final int CACHE_CONSUMER = 3; 164 165 /** 166 * Constant that indicates automatic choice of an appropriate caching level 167 * (depending on the transaction management strategy). 168 * @see #setCacheLevel 169 */ 170 public static final int CACHE_AUTO = 4; 171 172 173 private static final Constants constants = new Constants(DefaultMessageListenerContainer.class); 174 175 176 private Executor taskExecutor; 177 178 private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE); 179 180 private int cacheLevel = CACHE_AUTO; 181 182 private int concurrentConsumers = 1; 183 184 private int maxConcurrentConsumers = 1; 185 186 private int maxMessagesPerTask = Integer.MIN_VALUE; 187 188 private int idleConsumerLimit = 1; 189 190 private int idleTaskExecutionLimit = 1; 191 192 private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>(); 193 194 private int activeInvokerCount = 0; 195 196 private int registeredWithDestination = 0; 197 198 private volatile boolean recovering = false; 199 200 private volatile boolean interrupted = false; 201 202 private Runnable stopCallback; 203 204 private Object currentRecoveryMarker = new Object(); 205 206 private final Object recoveryMonitor = new Object(); 207 208 209 /** 210 * Set the Spring {@code TaskExecutor} to use for running the listener threads. 211 * <p>Default is a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}, 212 * starting up a number of new threads, according to the specified number 213 * of concurrent consumers. 214 * <p>Specify an alternative {@code TaskExecutor} for integration with an existing 215 * thread pool. Note that this really only adds value if the threads are 216 * managed in a specific fashion, for example within a Java EE environment. 217 * A plain thread pool does not add much value, as this listener container 218 * will occupy a number of threads for its entire lifetime. 219 * @see #setConcurrentConsumers 220 * @see org.springframework.core.task.SimpleAsyncTaskExecutor 221 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor 222 */ 223 public void setTaskExecutor(Executor taskExecutor) { 224 this.taskExecutor = taskExecutor; 225 } 226 227 /** 228 * Specify the {@link BackOff} instance to use to compute the interval 229 * between recovery attempts. If the {@link BackOffExecution} implementation 230 * returns {@link BackOffExecution#STOP}, this listener container will not further 231 * attempt to recover. 232 * <p>The {@link #setRecoveryInterval(long) recovery interval} is ignored 233 * when this property is set. 234 * @since 4.1 235 */ 236 public void setBackOff(BackOff backOff) { 237 this.backOff = backOff; 238 } 239 240 /** 241 * Specify the interval between recovery attempts, in <b>milliseconds</b>. 242 * The default is 5000 ms, that is, 5 seconds. This is a convenience method 243 * to create a {@link FixedBackOff} with the specified interval. 244 * <p>For more recovery options, consider specifying a {@link BackOff} 245 * instance instead. 246 * @see #setBackOff(BackOff) 247 * @see #handleListenerSetupFailure 248 */ 249 public void setRecoveryInterval(long recoveryInterval) { 250 this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE); 251 } 252 253 /** 254 * Specify the level of caching that this listener container is allowed to apply, 255 * in the form of the name of the corresponding constant: e.g. "CACHE_CONNECTION". 256 * @see #setCacheLevel 257 */ 258 public void setCacheLevelName(String constantName) throws IllegalArgumentException { 259 if (constantName == null || !constantName.startsWith("CACHE_")) { 260 throw new IllegalArgumentException("Only cache constants allowed"); 261 } 262 setCacheLevel(constants.asNumber(constantName).intValue()); 263 } 264 265 /** 266 * Specify the level of caching that this listener container is allowed to apply. 267 * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified 268 * (to reobtain all resources freshly within the scope of the external transaction), 269 * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources). 270 * <p>Some Java EE servers only register their JMS resources with an ongoing XA 271 * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session}, 272 * which is why this listener container by default does not cache any of those. 273 * However, depending on the rules of your server with respect to the caching 274 * of transactional resources, consider switching this setting to at least 275 * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an 276 * external transaction manager. 277 * @see #CACHE_NONE 278 * @see #CACHE_CONNECTION 279 * @see #CACHE_SESSION 280 * @see #CACHE_CONSUMER 281 * @see #setCacheLevelName 282 * @see #setTransactionManager 283 */ 284 public void setCacheLevel(int cacheLevel) { 285 this.cacheLevel = cacheLevel; 286 } 287 288 /** 289 * Return the level of caching that this listener container is allowed to apply. 290 */ 291 public int getCacheLevel() { 292 return this.cacheLevel; 293 } 294 295 296 /** 297 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple 298 * upper limit String, e.g. "10" (the lower limit will be 1 in this case). 299 * <p>This listener container will always hold on to the minimum number of consumers 300 * ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number 301 * of consumers {@link #setMaxConcurrentConsumers} in case of increasing load. 302 */ 303 @Override 304 public void setConcurrency(String concurrency) { 305 try { 306 int separatorIndex = concurrency.indexOf('-'); 307 if (separatorIndex != -1) { 308 setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex))); 309 setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1))); 310 } 311 else { 312 setConcurrentConsumers(1); 313 setMaxConcurrentConsumers(Integer.parseInt(concurrency)); 314 } 315 } 316 catch (NumberFormatException ex) { 317 throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + 318 "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported."); 319 } 320 } 321 322 /** 323 * Specify the number of concurrent consumers to create. Default is 1. 324 * <p>Specifying a higher value for this setting will increase the standard 325 * level of scheduled concurrent consumers at runtime: This is effectively 326 * the minimum number of concurrent consumers which will be scheduled 327 * at any given time. This is a static setting; for dynamic scaling, 328 * consider specifying the "maxConcurrentConsumers" setting instead. 329 * <p>Raising the number of concurrent consumers is recommendable in order 330 * to scale the consumption of messages coming in from a queue. However, 331 * note that any ordering guarantees are lost once multiple consumers are 332 * registered. In general, stick with 1 consumer for low-volume queues. 333 * <p><b>Do not raise the number of concurrent consumers for a topic, 334 * unless vendor-specific setup measures clearly allow for it.</b> 335 * With regular setup, this would lead to concurrent consumption 336 * of the same message, which is hardly ever desirable. 337 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 338 * @see #setMaxConcurrentConsumers 339 */ 340 public void setConcurrentConsumers(int concurrentConsumers) { 341 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); 342 synchronized (this.lifecycleMonitor) { 343 this.concurrentConsumers = concurrentConsumers; 344 if (this.maxConcurrentConsumers < concurrentConsumers) { 345 this.maxConcurrentConsumers = concurrentConsumers; 346 } 347 } 348 } 349 350 /** 351 * Return the "concurrentConsumer" setting. 352 * <p>This returns the currently configured "concurrentConsumers" value; 353 * the number of currently scheduled/active consumers might differ. 354 * @see #getScheduledConsumerCount() 355 * @see #getActiveConsumerCount() 356 */ 357 public final int getConcurrentConsumers() { 358 synchronized (this.lifecycleMonitor) { 359 return this.concurrentConsumers; 360 } 361 } 362 363 /** 364 * Specify the maximum number of concurrent consumers to create. Default is 1. 365 * <p>If this setting is higher than "concurrentConsumers", the listener container 366 * will dynamically schedule new consumers at runtime, provided that enough 367 * incoming messages are encountered. Once the load goes down again, the number of 368 * consumers will be reduced to the standard level ("concurrentConsumers") again. 369 * <p>Raising the number of concurrent consumers is recommendable in order 370 * to scale the consumption of messages coming in from a queue. However, 371 * note that any ordering guarantees are lost once multiple consumers are 372 * registered. In general, stick with 1 consumer for low-volume queues. 373 * <p><b>Do not raise the number of concurrent consumers for a topic, 374 * unless vendor-specific setup measures clearly allow for it.</b> 375 * With regular setup, this would lead to concurrent consumption 376 * of the same message, which is hardly ever desirable. 377 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 378 * @see #setConcurrentConsumers 379 */ 380 public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { 381 Assert.isTrue(maxConcurrentConsumers > 0, "'maxConcurrentConsumers' value must be at least 1 (one)"); 382 synchronized (this.lifecycleMonitor) { 383 this.maxConcurrentConsumers = Math.max(maxConcurrentConsumers, this.concurrentConsumers); 384 } 385 } 386 387 /** 388 * Return the "maxConcurrentConsumer" setting. 389 * <p>This returns the currently configured "maxConcurrentConsumers" value; 390 * the number of currently scheduled/active consumers might differ. 391 * @see #getScheduledConsumerCount() 392 * @see #getActiveConsumerCount() 393 */ 394 public final int getMaxConcurrentConsumers() { 395 synchronized (this.lifecycleMonitor) { 396 return this.maxConcurrentConsumers; 397 } 398 } 399 400 /** 401 * Specify the maximum number of messages to process in one task. 402 * More concretely, this limits the number of message reception attempts 403 * per task, which includes receive iterations that did not actually 404 * pick up a message until they hit their timeout (see the 405 * {@link #setReceiveTimeout "receiveTimeout"} property). 406 * <p>Default is unlimited (-1) in case of a standard TaskExecutor, 407 * reusing the original invoker threads until shutdown (at the 408 * expense of limited dynamic scheduling). 409 * <p>In case of a SchedulingTaskExecutor indicating a preference for 410 * short-lived tasks, the default is 10 instead. Specify a number 411 * of 10 to 100 messages to balance between rather long-lived and 412 * rather short-lived tasks here. 413 * <p>Long-lived tasks avoid frequent thread context switches through 414 * sticking with the same thread all the way through, while short-lived 415 * tasks allow thread pools to control the scheduling. Hence, thread 416 * pools will usually prefer short-lived tasks. 417 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 418 * @see #setTaskExecutor 419 * @see #setReceiveTimeout 420 * @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks() 421 */ 422 public void setMaxMessagesPerTask(int maxMessagesPerTask) { 423 Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0"); 424 synchronized (this.lifecycleMonitor) { 425 this.maxMessagesPerTask = maxMessagesPerTask; 426 } 427 } 428 429 /** 430 * Return the maximum number of messages to process in one task. 431 */ 432 public final int getMaxMessagesPerTask() { 433 synchronized (this.lifecycleMonitor) { 434 return this.maxMessagesPerTask; 435 } 436 } 437 438 /** 439 * Specify the limit for the number of consumers that are allowed to be idle 440 * at any given time. 441 * <p>This limit is used by the {@link #scheduleNewInvokerIfAppropriate} method 442 * to determine if a new invoker should be created. Increasing the limit causes 443 * invokers to be created more aggressively. This can be useful to ramp up the 444 * number of invokers faster. 445 * <p>The default is 1, only scheduling a new invoker (which is likely to 446 * be idle initially) if none of the existing invokers is currently idle. 447 */ 448 public void setIdleConsumerLimit(int idleConsumerLimit) { 449 Assert.isTrue(idleConsumerLimit > 0, "'idleConsumerLimit' must be 1 or higher"); 450 synchronized (this.lifecycleMonitor) { 451 this.idleConsumerLimit = idleConsumerLimit; 452 } 453 } 454 455 /** 456 * Return the limit for the number of idle consumers. 457 */ 458 public final int getIdleConsumerLimit() { 459 synchronized (this.lifecycleMonitor) { 460 return this.idleConsumerLimit; 461 } 462 } 463 464 /** 465 * Specify the limit for idle executions of a consumer task, not having 466 * received any message within its execution. If this limit is reached, 467 * the task will shut down and leave receiving to other executing tasks. 468 * <p>The default is 1, closing idle resources early once a task didn't 469 * receive a message. This applies to dynamic scheduling only; see the 470 * {@link #setMaxConcurrentConsumers "maxConcurrentConsumers"} setting. 471 * The minimum number of consumers 472 * (see {@link #setConcurrentConsumers "concurrentConsumers"}) 473 * will be kept around until shutdown in any case. 474 * <p>Within each task execution, a number of message reception attempts 475 * (according to the "maxMessagesPerTask" setting) will each wait for an incoming 476 * message (according to the "receiveTimeout" setting). If all of those receive 477 * attempts in a given task return without a message, the task is considered 478 * idle with respect to received messages. Such a task may still be rescheduled; 479 * however, once it reached the specified "idleTaskExecutionLimit", it will 480 * shut down (in case of dynamic scaling). 481 * <p>Raise this limit if you encounter too frequent scaling up and down. 482 * With this limit being higher, an idle consumer will be kept around longer, 483 * avoiding the restart of a consumer once a new load of messages comes in. 484 * Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value, 485 * which will also lead to idle consumers being kept around for a longer time 486 * (while also increasing the average execution time of each scheduled task). 487 * <p><b>This setting can be modified at runtime, for example through JMX.</b> 488 * @see #setMaxMessagesPerTask 489 * @see #setReceiveTimeout 490 */ 491 public void setIdleTaskExecutionLimit(int idleTaskExecutionLimit) { 492 Assert.isTrue(idleTaskExecutionLimit > 0, "'idleTaskExecutionLimit' must be 1 or higher"); 493 synchronized (this.lifecycleMonitor) { 494 this.idleTaskExecutionLimit = idleTaskExecutionLimit; 495 } 496 } 497 498 /** 499 * Return the limit for idle executions of a consumer task. 500 */ 501 public final int getIdleTaskExecutionLimit() { 502 synchronized (this.lifecycleMonitor) { 503 return this.idleTaskExecutionLimit; 504 } 505 } 506 507 508 //------------------------------------------------------------------------- 509 // Implementation of AbstractMessageListenerContainer's template methods 510 //------------------------------------------------------------------------- 511 512 @Override 513 public void initialize() { 514 // Adapt default cache level. 515 if (this.cacheLevel == CACHE_AUTO) { 516 this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER); 517 } 518 519 // Prepare taskExecutor and maxMessagesPerTask. 520 synchronized (this.lifecycleMonitor) { 521 if (this.taskExecutor == null) { 522 this.taskExecutor = createDefaultTaskExecutor(); 523 } 524 else if (this.taskExecutor instanceof SchedulingTaskExecutor && 525 ((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() && 526 this.maxMessagesPerTask == Integer.MIN_VALUE) { 527 // TaskExecutor indicated a preference for short-lived tasks. According to 528 // setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case 529 // unless the user specified a custom value. 530 this.maxMessagesPerTask = 10; 531 } 532 } 533 534 // Proceed with actual listener initialization. 535 super.initialize(); 536 } 537 538 /** 539 * Creates the specified number of concurrent consumers, 540 * in the form of a JMS Session plus associated MessageConsumer 541 * running in a separate thread. 542 * @see #scheduleNewInvoker 543 * @see #setTaskExecutor 544 */ 545 @Override 546 protected void doInitialize() throws JMSException { 547 synchronized (this.lifecycleMonitor) { 548 for (int i = 0; i < this.concurrentConsumers; i++) { 549 scheduleNewInvoker(); 550 } 551 } 552 } 553 554 /** 555 * Destroy the registered JMS Sessions and associated MessageConsumers. 556 */ 557 @Override 558 protected void doShutdown() throws JMSException { 559 logger.debug("Waiting for shutdown of message listener invokers"); 560 try { 561 synchronized (this.lifecycleMonitor) { 562 int waitCount = 0; 563 while (this.activeInvokerCount > 0) { 564 if (logger.isDebugEnabled()) { 565 logger.debug("Still waiting for shutdown of " + this.activeInvokerCount + 566 " message listener invokers (iteration " + waitCount + ")"); 567 } 568 // Wait for AsyncMessageListenerInvokers to deactivate themselves... 569 long timeout = getReceiveTimeout(); 570 if (timeout > 0) { 571 this.lifecycleMonitor.wait(timeout); 572 } 573 else { 574 this.lifecycleMonitor.wait(); 575 } 576 waitCount++; 577 } 578 // Clear remaining scheduled invokers, possibly left over as paused tasks 579 for (AsyncMessageListenerInvoker scheduledInvoker : this.scheduledInvokers) { 580 scheduledInvoker.clearResources(); 581 } 582 this.scheduledInvokers.clear(); 583 } 584 } 585 catch (InterruptedException ex) { 586 // Re-interrupt current thread, to allow other threads to react. 587 Thread.currentThread().interrupt(); 588 } 589 } 590 591 /** 592 * Overridden to reset the stop callback, if any. 593 */ 594 @Override 595 public void start() throws JmsException { 596 synchronized (this.lifecycleMonitor) { 597 this.stopCallback = null; 598 } 599 super.start(); 600 } 601 602 /** 603 * Stop this listener container, invoking the specific callback 604 * once all listener processing has actually stopped. 605 * <p>Note: Further {@code stop(runnable)} calls (before processing 606 * has actually stopped) will override the specified callback. Only the 607 * latest specified callback will be invoked. 608 * <p>If a subsequent {@link #start()} call restarts the listener container 609 * before it has fully stopped, the callback will not get invoked at all. 610 * @param callback the callback to invoke once listener processing 611 * has fully stopped 612 * @throws JmsException if stopping failed 613 * @see #stop() 614 */ 615 @Override 616 public void stop(Runnable callback) throws JmsException { 617 synchronized (this.lifecycleMonitor) { 618 if (!isRunning() || this.stopCallback != null) { 619 // Not started, already stopped, or previous stop attempt in progress 620 // -> return immediately, no stop process to control anymore. 621 callback.run(); 622 return; 623 } 624 this.stopCallback = callback; 625 } 626 stop(); 627 } 628 629 /** 630 * Return the number of currently scheduled consumers. 631 * <p>This number will always be between "concurrentConsumers" and 632 * "maxConcurrentConsumers", but might be higher than "activeConsumerCount" 633 * (in case some consumers are scheduled but not executing at the moment). 634 * @see #getConcurrentConsumers() 635 * @see #getMaxConcurrentConsumers() 636 * @see #getActiveConsumerCount() 637 */ 638 public final int getScheduledConsumerCount() { 639 synchronized (this.lifecycleMonitor) { 640 return this.scheduledInvokers.size(); 641 } 642 } 643 644 /** 645 * Return the number of currently active consumers. 646 * <p>This number will always be between "concurrentConsumers" and 647 * "maxConcurrentConsumers", but might be lower than "scheduledConsumerCount" 648 * (in case some consumers are scheduled but not executing at the moment). 649 * @see #getConcurrentConsumers() 650 * @see #getMaxConcurrentConsumers() 651 * @see #getActiveConsumerCount() 652 */ 653 public final int getActiveConsumerCount() { 654 synchronized (this.lifecycleMonitor) { 655 return this.activeInvokerCount; 656 } 657 } 658 659 /** 660 * Return whether at least one consumer has entered a fixed registration with the 661 * target destination. This is particularly interesting for the pub-sub case where 662 * it might be important to have an actual consumer registered that is guaranteed 663 * not to miss any messages that are just about to be published. 664 * <p>This method may be polled after a {@link #start()} call, until asynchronous 665 * registration of consumers has happened which is when the method will start returning 666 * {@code true} – provided that the listener container ever actually establishes 667 * a fixed registration. It will then keep returning {@code true} until shutdown, 668 * since the container will hold on to at least one consumer registration thereafter. 669 * <p>Note that a listener container is not bound to having a fixed registration in 670 * the first place. It may also keep recreating consumers for every invoker execution. 671 * This particularly depends on the {@link #setCacheLevel cache level} setting: 672 * only {@link #CACHE_CONSUMER} will lead to a fixed registration. 673 */ 674 public boolean isRegisteredWithDestination() { 675 synchronized (this.lifecycleMonitor) { 676 return (this.registeredWithDestination > 0); 677 } 678 } 679 680 681 /** 682 * Create a default TaskExecutor. Called if no explicit TaskExecutor has been specified. 683 * <p>The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} 684 * with the specified bean name (or the class name, if no bean name specified) as thread name prefix. 685 * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String) 686 */ 687 protected TaskExecutor createDefaultTaskExecutor() { 688 String beanName = getBeanName(); 689 String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX); 690 return new SimpleAsyncTaskExecutor(threadNamePrefix); 691 } 692 693 /** 694 * Schedule a new invoker, increasing the total number of scheduled 695 * invokers for this listener container. 696 */ 697 private void scheduleNewInvoker() { 698 AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker(); 699 if (rescheduleTaskIfNecessary(invoker)) { 700 // This should always be true, since we're only calling this when active. 701 this.scheduledInvokers.add(invoker); 702 } 703 } 704 705 /** 706 * Use a shared JMS Connection depending on the "cacheLevel" setting. 707 * @see #setCacheLevel 708 * @see #CACHE_CONNECTION 709 */ 710 @Override 711 protected final boolean sharedConnectionEnabled() { 712 return (getCacheLevel() >= CACHE_CONNECTION); 713 } 714 715 /** 716 * Re-executes the given task via this listener container's TaskExecutor. 717 * @see #setTaskExecutor 718 */ 719 @Override 720 protected void doRescheduleTask(Object task) { 721 this.taskExecutor.execute((Runnable) task); 722 } 723 724 /** 725 * Tries scheduling a new invoker, since we know messages are coming in... 726 * @see #scheduleNewInvokerIfAppropriate() 727 */ 728 @Override 729 protected void messageReceived(Object invoker, Session session) { 730 ((AsyncMessageListenerInvoker) invoker).setIdle(false); 731 scheduleNewInvokerIfAppropriate(); 732 } 733 734 /** 735 * Marks the affected invoker as idle. 736 */ 737 @Override 738 protected void noMessageReceived(Object invoker, Session session) { 739 ((AsyncMessageListenerInvoker) invoker).setIdle(true); 740 } 741 742 /** 743 * Schedule a new invoker, increasing the total number of scheduled 744 * invokers for this listener container, but only if the specified 745 * "maxConcurrentConsumers" limit has not been reached yet, and only 746 * if the specified "idleConsumerLimit" has not been reached either. 747 * <p>Called once a message has been received, in order to scale up while 748 * processing the message in the invoker that originally received it. 749 * @see #setTaskExecutor 750 * @see #getMaxConcurrentConsumers() 751 * @see #getIdleConsumerLimit() 752 */ 753 protected void scheduleNewInvokerIfAppropriate() { 754 if (isRunning()) { 755 resumePausedTasks(); 756 synchronized (this.lifecycleMonitor) { 757 if (this.scheduledInvokers.size() < this.maxConcurrentConsumers && 758 getIdleInvokerCount() < this.idleConsumerLimit) { 759 scheduleNewInvoker(); 760 if (logger.isDebugEnabled()) { 761 logger.debug("Raised scheduled invoker count: " + this.scheduledInvokers.size()); 762 } 763 } 764 } 765 } 766 } 767 768 /** 769 * Determine whether the current invoker should be rescheduled, 770 * given that it might not have received a message in a while. 771 * @param idleTaskExecutionCount the number of idle executions 772 * that this invoker task has already accumulated (in a row) 773 */ 774 private boolean shouldRescheduleInvoker(int idleTaskExecutionCount) { 775 boolean superfluous = 776 (idleTaskExecutionCount >= this.idleTaskExecutionLimit && getIdleInvokerCount() > 1); 777 return (this.scheduledInvokers.size() <= 778 (superfluous ? this.concurrentConsumers : this.maxConcurrentConsumers)); 779 } 780 781 /** 782 * Determine whether this listener container currently has more 783 * than one idle instance among its scheduled invokers. 784 */ 785 private int getIdleInvokerCount() { 786 int count = 0; 787 for (AsyncMessageListenerInvoker invoker : this.scheduledInvokers) { 788 if (invoker.isIdle()) { 789 count++; 790 } 791 } 792 return count; 793 } 794 795 796 /** 797 * Overridden to accept a failure in the initial setup - leaving it up to the 798 * asynchronous invokers to establish the shared Connection on first access. 799 * @see #refreshConnectionUntilSuccessful() 800 */ 801 @Override 802 protected void establishSharedConnection() { 803 try { 804 super.establishSharedConnection(); 805 } 806 catch (Exception ex) { 807 if (ex instanceof JMSException) { 808 invokeExceptionListener((JMSException) ex); 809 } 810 logger.debug("Could not establish shared JMS Connection - " + 811 "leaving it up to asynchronous invokers to establish a Connection as soon as possible", ex); 812 } 813 } 814 815 /** 816 * This implementations proceeds even after an exception thrown from 817 * {@code Connection.start()}, relying on listeners to perform 818 * appropriate recovery. 819 */ 820 @Override 821 protected void startSharedConnection() { 822 try { 823 super.startSharedConnection(); 824 } 825 catch (Exception ex) { 826 logger.debug("Connection start failed - relying on listeners to perform recovery", ex); 827 } 828 } 829 830 /** 831 * This implementations proceeds even after an exception thrown from 832 * {@code Connection.stop()}, relying on listeners to perform 833 * appropriate recovery after a restart. 834 */ 835 @Override 836 protected void stopSharedConnection() { 837 try { 838 super.stopSharedConnection(); 839 } 840 catch (Exception ex) { 841 logger.debug("Connection stop failed - relying on listeners to perform recovery after restart", ex); 842 } 843 } 844 845 /** 846 * Handle the given exception that arose during setup of a listener. 847 * Called for every such exception in every concurrent listener. 848 * <p>The default implementation logs the exception at warn level 849 * if not recovered yet, and at debug level if already recovered. 850 * Can be overridden in subclasses. 851 * @param ex the exception to handle 852 * @param alreadyRecovered whether a previously executing listener 853 * already recovered from the present listener setup failure 854 * (this usually indicates a follow-up failure than can be ignored 855 * other than for debug log purposes) 856 * @see #recoverAfterListenerSetupFailure() 857 */ 858 protected void handleListenerSetupFailure(Throwable ex, boolean alreadyRecovered) { 859 if (ex instanceof JMSException) { 860 invokeExceptionListener((JMSException) ex); 861 } 862 if (ex instanceof SharedConnectionNotInitializedException) { 863 if (!alreadyRecovered) { 864 logger.info("JMS message listener invoker needs to establish shared Connection"); 865 } 866 } 867 else { 868 // Recovery during active operation.. 869 if (alreadyRecovered) { 870 logger.debug("Setup of JMS message listener invoker failed - already recovered by other invoker", ex); 871 } 872 else { 873 StringBuilder msg = new StringBuilder(); 874 msg.append("Setup of JMS message listener invoker failed for destination '"); 875 msg.append(getDestinationDescription()).append("' - trying to recover. Cause: "); 876 msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage()); 877 if (logger.isDebugEnabled()) { 878 logger.warn(msg, ex); 879 } 880 else { 881 logger.warn(msg); 882 } 883 } 884 } 885 } 886 887 /** 888 * Recover this listener container after a listener failed to set itself up, 889 * for example re-establishing the underlying Connection. 890 * <p>The default implementation delegates to DefaultMessageListenerContainer's 891 * recovery-capable {@link #refreshConnectionUntilSuccessful()} method, which will 892 * try to re-establish a Connection to the JMS provider both for the shared 893 * and the non-shared Connection case. 894 * @see #refreshConnectionUntilSuccessful() 895 * @see #refreshDestination() 896 */ 897 protected void recoverAfterListenerSetupFailure() { 898 this.recovering = true; 899 try { 900 refreshConnectionUntilSuccessful(); 901 refreshDestination(); 902 } 903 finally { 904 this.recovering = false; 905 this.interrupted = false; 906 } 907 } 908 909 /** 910 * Refresh the underlying Connection, not returning before an attempt has been 911 * successful. Called in case of a shared Connection as well as without shared 912 * Connection, so either needs to operate on the shared Connection or on a 913 * temporary Connection that just gets established for validation purposes. 914 * <p>The default implementation retries until it successfully established a 915 * Connection, for as long as this message listener container is running. 916 * Applies the specified recovery interval between retries. 917 * @see #setRecoveryInterval 918 * @see #start() 919 * @see #stop() 920 */ 921 protected void refreshConnectionUntilSuccessful() { 922 BackOffExecution execution = this.backOff.start(); 923 while (isRunning()) { 924 try { 925 if (sharedConnectionEnabled()) { 926 refreshSharedConnection(); 927 } 928 else { 929 Connection con = createConnection(); 930 JmsUtils.closeConnection(con); 931 } 932 logger.info("Successfully refreshed JMS Connection"); 933 break; 934 } 935 catch (Exception ex) { 936 if (ex instanceof JMSException) { 937 invokeExceptionListener((JMSException) ex); 938 } 939 StringBuilder msg = new StringBuilder(); 940 msg.append("Could not refresh JMS Connection for destination '"); 941 msg.append(getDestinationDescription()).append("' - retrying using "); 942 msg.append(execution).append(". Cause: "); 943 msg.append(ex instanceof JMSException ? JmsUtils.buildExceptionMessage((JMSException) ex) : ex.getMessage()); 944 if (logger.isDebugEnabled()) { 945 logger.error(msg, ex); 946 } 947 else { 948 logger.error(msg); 949 } 950 } 951 if (!applyBackOffTime(execution)) { 952 StringBuilder msg = new StringBuilder(); 953 msg.append("Stopping container for destination '") 954 .append(getDestinationDescription()) 955 .append("': back-off policy does not allow ").append("for further attempts."); 956 logger.error(msg.toString()); 957 stop(); 958 } 959 } 960 } 961 962 /** 963 * Refresh the JMS destination that this listener container operates on. 964 * <p>Called after listener setup failure, assuming that a cached Destination 965 * object might have become invalid (a typical case on WebLogic JMS). 966 * <p>The default implementation removes the destination from a 967 * DestinationResolver's cache, in case of a CachingDestinationResolver. 968 * @see #setDestinationName 969 * @see org.springframework.jms.support.destination.CachingDestinationResolver 970 */ 971 protected void refreshDestination() { 972 String destName = getDestinationName(); 973 if (destName != null) { 974 DestinationResolver destResolver = getDestinationResolver(); 975 if (destResolver instanceof CachingDestinationResolver) { 976 ((CachingDestinationResolver) destResolver).removeFromCache(destName); 977 } 978 } 979 } 980 981 /** 982 * Apply the next back-off time using the specified {@link BackOffExecution}. 983 * <p>Return {@code true} if the back-off period has been applied and a new 984 * attempt to recover should be made, {@code false} if no further attempt 985 * should be made. 986 * @since 4.1 987 */ 988 protected boolean applyBackOffTime(BackOffExecution execution) { 989 if (this.recovering && this.interrupted) { 990 // Interrupted right before and still failing... give up. 991 return false; 992 } 993 long interval = execution.nextBackOff(); 994 if (interval == BackOffExecution.STOP) { 995 return false; 996 } 997 else { 998 try { 999 synchronized (this.lifecycleMonitor) { 1000 this.lifecycleMonitor.wait(interval); 1001 } 1002 } 1003 catch (InterruptedException interEx) { 1004 // Re-interrupt current thread, to allow other threads to react. 1005 Thread.currentThread().interrupt(); 1006 if (this.recovering) { 1007 this.interrupted = true; 1008 } 1009 } 1010 return true; 1011 } 1012 } 1013 1014 /** 1015 * Return whether this listener container is currently in a recovery attempt. 1016 * <p>May be used to detect recovery phases but also the end of a recovery phase, 1017 * with {@code isRecovering()} switching to {@code false} after having been found 1018 * to return {@code true} before. 1019 * @see #recoverAfterListenerSetupFailure() 1020 */ 1021 public final boolean isRecovering() { 1022 return this.recovering; 1023 } 1024 1025 1026 //------------------------------------------------------------------------- 1027 // Inner classes used as internal adapters 1028 //------------------------------------------------------------------------- 1029 1030 /** 1031 * Runnable that performs looped {@code MessageConsumer.receive()} calls. 1032 */ 1033 private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable { 1034 1035 private Session session; 1036 1037 private MessageConsumer consumer; 1038 1039 private Object lastRecoveryMarker; 1040 1041 private boolean lastMessageSucceeded; 1042 1043 private int idleTaskExecutionCount = 0; 1044 1045 private volatile boolean idle = true; 1046 1047 @Override 1048 public void run() { 1049 synchronized (lifecycleMonitor) { 1050 activeInvokerCount++; 1051 lifecycleMonitor.notifyAll(); 1052 } 1053 boolean messageReceived = false; 1054 try { 1055 if (maxMessagesPerTask < 0) { 1056 messageReceived = executeOngoingLoop(); 1057 } 1058 else { 1059 int messageCount = 0; 1060 while (isRunning() && messageCount < maxMessagesPerTask) { 1061 messageReceived = (invokeListener() || messageReceived); 1062 messageCount++; 1063 } 1064 } 1065 } 1066 catch (Throwable ex) { 1067 clearResources(); 1068 if (!this.lastMessageSucceeded) { 1069 // We failed more than once in a row or on startup - 1070 // wait before first recovery attempt. 1071 waitBeforeRecoveryAttempt(); 1072 } 1073 this.lastMessageSucceeded = false; 1074 boolean alreadyRecovered = false; 1075 synchronized (recoveryMonitor) { 1076 if (this.lastRecoveryMarker == currentRecoveryMarker) { 1077 handleListenerSetupFailure(ex, false); 1078 recoverAfterListenerSetupFailure(); 1079 currentRecoveryMarker = new Object(); 1080 } 1081 else { 1082 alreadyRecovered = true; 1083 } 1084 } 1085 if (alreadyRecovered) { 1086 handleListenerSetupFailure(ex, true); 1087 } 1088 } 1089 finally { 1090 synchronized (lifecycleMonitor) { 1091 decreaseActiveInvokerCount(); 1092 lifecycleMonitor.notifyAll(); 1093 } 1094 if (!messageReceived) { 1095 this.idleTaskExecutionCount++; 1096 } 1097 else { 1098 this.idleTaskExecutionCount = 0; 1099 } 1100 synchronized (lifecycleMonitor) { 1101 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) { 1102 // We're shutting down completely. 1103 scheduledInvokers.remove(this); 1104 if (logger.isDebugEnabled()) { 1105 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size()); 1106 } 1107 lifecycleMonitor.notifyAll(); 1108 clearResources(); 1109 } 1110 else if (isRunning()) { 1111 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount(); 1112 if (nonPausedConsumers < 1) { 1113 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " + 1114 "Check your thread pool configuration! Manual recovery necessary through a start() call."); 1115 } 1116 else if (nonPausedConsumers < getConcurrentConsumers()) { 1117 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " + 1118 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " + 1119 "to be triggered by remaining consumers."); 1120 } 1121 } 1122 } 1123 } 1124 } 1125 1126 private boolean executeOngoingLoop() throws JMSException { 1127 boolean messageReceived = false; 1128 boolean active = true; 1129 while (active) { 1130 synchronized (lifecycleMonitor) { 1131 boolean interrupted = false; 1132 boolean wasWaiting = false; 1133 while ((active = isActive()) && !isRunning()) { 1134 if (interrupted) { 1135 throw new IllegalStateException("Thread was interrupted while waiting for " + 1136 "a restart of the listener container, but container is still stopped"); 1137 } 1138 if (!wasWaiting) { 1139 decreaseActiveInvokerCount(); 1140 } 1141 wasWaiting = true; 1142 try { 1143 lifecycleMonitor.wait(); 1144 } 1145 catch (InterruptedException ex) { 1146 // Re-interrupt current thread, to allow other threads to react. 1147 Thread.currentThread().interrupt(); 1148 interrupted = true; 1149 } 1150 } 1151 if (wasWaiting) { 1152 activeInvokerCount++; 1153 } 1154 if (scheduledInvokers.size() > maxConcurrentConsumers) { 1155 active = false; 1156 } 1157 } 1158 if (active) { 1159 messageReceived = (invokeListener() || messageReceived); 1160 } 1161 } 1162 return messageReceived; 1163 } 1164 1165 private boolean invokeListener() throws JMSException { 1166 initResourcesIfNecessary(); 1167 boolean messageReceived = receiveAndExecute(this, this.session, this.consumer); 1168 this.lastMessageSucceeded = true; 1169 return messageReceived; 1170 } 1171 1172 private void decreaseActiveInvokerCount() { 1173 activeInvokerCount--; 1174 if (stopCallback != null && activeInvokerCount == 0) { 1175 stopCallback.run(); 1176 stopCallback = null; 1177 } 1178 } 1179 1180 private void initResourcesIfNecessary() throws JMSException { 1181 if (getCacheLevel() <= CACHE_CONNECTION) { 1182 updateRecoveryMarker(); 1183 } 1184 else { 1185 if (this.session == null && getCacheLevel() >= CACHE_SESSION) { 1186 updateRecoveryMarker(); 1187 this.session = createSession(getSharedConnection()); 1188 } 1189 if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) { 1190 this.consumer = createListenerConsumer(this.session); 1191 synchronized (lifecycleMonitor) { 1192 registeredWithDestination++; 1193 } 1194 } 1195 } 1196 } 1197 1198 private void updateRecoveryMarker() { 1199 synchronized (recoveryMonitor) { 1200 this.lastRecoveryMarker = currentRecoveryMarker; 1201 } 1202 } 1203 1204 private void clearResources() { 1205 if (sharedConnectionEnabled()) { 1206 synchronized (sharedConnectionMonitor) { 1207 JmsUtils.closeMessageConsumer(this.consumer); 1208 JmsUtils.closeSession(this.session); 1209 } 1210 } 1211 else { 1212 JmsUtils.closeMessageConsumer(this.consumer); 1213 JmsUtils.closeSession(this.session); 1214 } 1215 if (this.consumer != null) { 1216 synchronized (lifecycleMonitor) { 1217 registeredWithDestination--; 1218 } 1219 } 1220 this.consumer = null; 1221 this.session = null; 1222 } 1223 1224 /** 1225 * Apply the back-off time once. In a regular scenario, the back-off is only applied if we 1226 * failed to recover with the broker. This additional wait period avoids a burst retry 1227 * scenario when the broker is actually up but something else if failing (i.e. listener 1228 * specific). 1229 */ 1230 private void waitBeforeRecoveryAttempt() { 1231 BackOffExecution execution = DefaultMessageListenerContainer.this.backOff.start(); 1232 applyBackOffTime(execution); 1233 } 1234 1235 @Override 1236 public boolean isLongLived() { 1237 return (maxMessagesPerTask < 0); 1238 } 1239 1240 public void setIdle(boolean idle) { 1241 this.idle = idle; 1242 } 1243 1244 public boolean isIdle() { 1245 return this.idle; 1246 } 1247 } 1248 1249}