001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.util.Iterator; 020import java.util.LinkedList; 021import java.util.List; 022 023import javax.jms.Connection; 024import javax.jms.JMSException; 025 026import org.springframework.beans.factory.BeanNameAware; 027import org.springframework.beans.factory.DisposableBean; 028import org.springframework.context.SmartLifecycle; 029import org.springframework.jms.JmsException; 030import org.springframework.jms.connection.ConnectionFactoryUtils; 031import org.springframework.jms.support.JmsUtils; 032import org.springframework.jms.support.destination.JmsDestinationAccessor; 033import org.springframework.lang.Nullable; 034import org.springframework.util.ClassUtils; 035 036/** 037 * Common base class for all containers which need to implement listening 038 * based on a JMS Connection (either shared or freshly obtained for each attempt). 039 * Inherits basic Connection and Session configuration handling from the 040 * {@link org.springframework.jms.support.JmsAccessor} base class. 041 * 042 * <p>This class provides basic lifecycle management, in particular management 043 * of a shared JMS Connection. Subclasses are supposed to plug into this 044 * lifecycle, implementing the {@link #sharedConnectionEnabled()} as well 045 * as the {@link #doInitialize()} and {@link #doShutdown()} template methods. 046 * 047 * <p>This base class does not assume any specific listener programming model 048 * or listener invoker mechanism. It just provides the general runtime 049 * lifecycle management needed for any kind of JMS-based listening mechanism 050 * that operates on a JMS Connection/Session. 051 * 052 * <p>For a concrete listener programming model, check out the 053 * {@link AbstractMessageListenerContainer} subclass. For a concrete listener 054 * invoker mechanism, check out the {@link DefaultMessageListenerContainer} class. 055 * 056 * @author Juergen Hoeller 057 * @since 2.0.3 058 * @see #sharedConnectionEnabled() 059 * @see #doInitialize() 060 * @see #doShutdown() 061 */ 062public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor 063 implements BeanNameAware, DisposableBean, SmartLifecycle { 064 065 @Nullable 066 private String clientId; 067 068 private boolean autoStartup = true; 069 070 private int phase = DEFAULT_PHASE; 071 072 @Nullable 073 private String beanName; 074 075 @Nullable 076 private Connection sharedConnection; 077 078 private boolean sharedConnectionStarted = false; 079 080 protected final Object sharedConnectionMonitor = new Object(); 081 082 private boolean active = false; 083 084 private volatile boolean running = false; 085 086 private final List<Object> pausedTasks = new LinkedList<>(); 087 088 protected final Object lifecycleMonitor = new Object(); 089 090 091 /** 092 * Specify the JMS client ID for a shared Connection created and used 093 * by this container. 094 * <p>Note that client IDs need to be unique among all active Connections 095 * of the underlying JMS provider. Furthermore, a client ID can only be 096 * assigned if the original ConnectionFactory hasn't already assigned one. 097 * @see javax.jms.Connection#setClientID 098 * @see #setConnectionFactory 099 */ 100 public void setClientId(@Nullable String clientId) { 101 this.clientId = clientId; 102 } 103 104 /** 105 * Return the JMS client ID for the shared Connection created and used 106 * by this container, if any. 107 */ 108 @Nullable 109 public String getClientId() { 110 return this.clientId; 111 } 112 113 /** 114 * Set whether to automatically start the container after initialization. 115 * <p>Default is "true"; set this to "false" to allow for manual startup 116 * through the {@link #start()} method. 117 */ 118 public void setAutoStartup(boolean autoStartup) { 119 this.autoStartup = autoStartup; 120 } 121 122 @Override 123 public boolean isAutoStartup() { 124 return this.autoStartup; 125 } 126 127 /** 128 * Specify the phase in which this container should be started and 129 * stopped. The startup order proceeds from lowest to highest, and 130 * the shutdown order is the reverse of that. By default this value 131 * is Integer.MAX_VALUE meaning that this container starts as late 132 * as possible and stops as soon as possible. 133 */ 134 public void setPhase(int phase) { 135 this.phase = phase; 136 } 137 138 /** 139 * Return the phase in which this container will be started and stopped. 140 */ 141 @Override 142 public int getPhase() { 143 return this.phase; 144 } 145 146 @Override 147 public void setBeanName(@Nullable String beanName) { 148 this.beanName = beanName; 149 } 150 151 /** 152 * Return the bean name that this listener container has been assigned 153 * in its containing bean factory, if any. 154 */ 155 @Nullable 156 protected final String getBeanName() { 157 return this.beanName; 158 } 159 160 161 /** 162 * Delegates to {@link #validateConfiguration()} and {@link #initialize()}. 163 */ 164 @Override 165 public void afterPropertiesSet() { 166 super.afterPropertiesSet(); 167 validateConfiguration(); 168 initialize(); 169 } 170 171 /** 172 * Validate the configuration of this container. 173 * <p>The default implementation is empty. To be overridden in subclasses. 174 */ 175 protected void validateConfiguration() { 176 } 177 178 /** 179 * Calls {@link #shutdown()} when the BeanFactory destroys the container instance. 180 * @see #shutdown() 181 */ 182 @Override 183 public void destroy() { 184 shutdown(); 185 } 186 187 188 //------------------------------------------------------------------------- 189 // Lifecycle methods for starting and stopping the container 190 //------------------------------------------------------------------------- 191 192 /** 193 * Initialize this container. 194 * <p>Creates a JMS Connection, starts the {@link javax.jms.Connection} 195 * (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off), 196 * and calls {@link #doInitialize()}. 197 * @throws org.springframework.jms.JmsException if startup failed 198 */ 199 public void initialize() throws JmsException { 200 try { 201 synchronized (this.lifecycleMonitor) { 202 this.active = true; 203 this.lifecycleMonitor.notifyAll(); 204 } 205 doInitialize(); 206 } 207 catch (JMSException ex) { 208 synchronized (this.sharedConnectionMonitor) { 209 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 210 this.sharedConnection = null; 211 } 212 throw convertJmsAccessException(ex); 213 } 214 } 215 216 /** 217 * Stop the shared Connection, call {@link #doShutdown()}, 218 * and close this container. 219 * @throws JmsException if shutdown failed 220 */ 221 public void shutdown() throws JmsException { 222 logger.debug("Shutting down JMS listener container"); 223 boolean wasRunning; 224 synchronized (this.lifecycleMonitor) { 225 wasRunning = this.running; 226 this.running = false; 227 this.active = false; 228 this.pausedTasks.clear(); 229 this.lifecycleMonitor.notifyAll(); 230 } 231 232 // Stop shared Connection early, if necessary. 233 if (wasRunning && sharedConnectionEnabled()) { 234 try { 235 stopSharedConnection(); 236 } 237 catch (Throwable ex) { 238 logger.debug("Could not stop JMS Connection on shutdown", ex); 239 } 240 } 241 242 // Shut down the invokers. 243 try { 244 doShutdown(); 245 } 246 catch (JMSException ex) { 247 throw convertJmsAccessException(ex); 248 } 249 finally { 250 if (sharedConnectionEnabled()) { 251 synchronized (this.sharedConnectionMonitor) { 252 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), false); 253 this.sharedConnection = null; 254 } 255 } 256 } 257 } 258 259 /** 260 * Return whether this container is currently active, 261 * that is, whether it has been set up but not shut down yet. 262 */ 263 public final boolean isActive() { 264 synchronized (this.lifecycleMonitor) { 265 return this.active; 266 } 267 } 268 269 /** 270 * Start this container. 271 * @throws JmsException if starting failed 272 * @see #doStart 273 */ 274 @Override 275 public void start() throws JmsException { 276 try { 277 doStart(); 278 } 279 catch (JMSException ex) { 280 throw convertJmsAccessException(ex); 281 } 282 } 283 284 /** 285 * Start the shared Connection, if any, and notify all invoker tasks. 286 * @throws JMSException if thrown by JMS API methods 287 * @see #startSharedConnection 288 */ 289 protected void doStart() throws JMSException { 290 // Lazily establish a shared Connection, if necessary. 291 if (sharedConnectionEnabled()) { 292 establishSharedConnection(); 293 } 294 295 // Reschedule paused tasks, if any. 296 synchronized (this.lifecycleMonitor) { 297 this.running = true; 298 this.lifecycleMonitor.notifyAll(); 299 resumePausedTasks(); 300 } 301 302 // Start the shared Connection, if any. 303 if (sharedConnectionEnabled()) { 304 startSharedConnection(); 305 } 306 } 307 308 /** 309 * Stop this container. 310 * @throws JmsException if stopping failed 311 * @see #doStop 312 */ 313 @Override 314 public void stop() throws JmsException { 315 try { 316 doStop(); 317 } 318 catch (JMSException ex) { 319 throw convertJmsAccessException(ex); 320 } 321 } 322 323 /** 324 * Notify all invoker tasks and stop the shared Connection, if any. 325 * @throws JMSException if thrown by JMS API methods 326 * @see #stopSharedConnection 327 */ 328 protected void doStop() throws JMSException { 329 synchronized (this.lifecycleMonitor) { 330 this.running = false; 331 this.lifecycleMonitor.notifyAll(); 332 } 333 334 if (sharedConnectionEnabled()) { 335 stopSharedConnection(); 336 } 337 } 338 339 /** 340 * Determine whether this container is currently running, 341 * that is, whether it has been started and not stopped yet. 342 * @see #start() 343 * @see #stop() 344 * @see #runningAllowed() 345 */ 346 @Override 347 public final boolean isRunning() { 348 return (this.running && runningAllowed()); 349 } 350 351 /** 352 * Check whether this container's listeners are generally allowed to run. 353 * <p>This implementation always returns {@code true}; the default 'running' 354 * state is purely determined by {@link #start()} / {@link #stop()}. 355 * <p>Subclasses may override this method to check against temporary 356 * conditions that prevent listeners from actually running. In other words, 357 * they may apply further restrictions to the 'running' state, returning 358 * {@code false} if such a restriction prevents listeners from running. 359 */ 360 protected boolean runningAllowed() { 361 return true; 362 } 363 364 365 //------------------------------------------------------------------------- 366 // Management of a shared JMS Connection 367 //------------------------------------------------------------------------- 368 369 /** 370 * Establish a shared Connection for this container. 371 * <p>The default implementation delegates to {@link #createSharedConnection()}, 372 * which does one immediate attempt and throws an exception if it fails. 373 * Can be overridden to have a recovery process in place, retrying 374 * until a Connection can be successfully established. 375 * @throws JMSException if thrown by JMS API methods 376 */ 377 protected void establishSharedConnection() throws JMSException { 378 synchronized (this.sharedConnectionMonitor) { 379 if (this.sharedConnection == null) { 380 this.sharedConnection = createSharedConnection(); 381 logger.debug("Established shared JMS Connection"); 382 } 383 } 384 } 385 386 /** 387 * Refresh the shared Connection that this container holds. 388 * <p>Called on startup and also after an infrastructure exception 389 * that occurred during invoker setup and/or execution. 390 * @throws JMSException if thrown by JMS API methods 391 */ 392 protected final void refreshSharedConnection() throws JMSException { 393 synchronized (this.sharedConnectionMonitor) { 394 ConnectionFactoryUtils.releaseConnection( 395 this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted); 396 this.sharedConnection = null; 397 this.sharedConnection = createSharedConnection(); 398 if (this.sharedConnectionStarted) { 399 this.sharedConnection.start(); 400 } 401 } 402 } 403 404 /** 405 * Create a shared Connection for this container. 406 * <p>The default implementation creates a standard Connection 407 * and prepares it through {@link #prepareSharedConnection}. 408 * @return the prepared Connection 409 * @throws JMSException if the creation failed 410 */ 411 protected Connection createSharedConnection() throws JMSException { 412 Connection con = createConnection(); 413 try { 414 prepareSharedConnection(con); 415 return con; 416 } 417 catch (JMSException ex) { 418 JmsUtils.closeConnection(con); 419 throw ex; 420 } 421 } 422 423 /** 424 * Prepare the given Connection, which is about to be registered 425 * as shared Connection for this container. 426 * <p>The default implementation sets the specified client id, if any. 427 * Subclasses can override this to apply further settings. 428 * @param connection the Connection to prepare 429 * @throws JMSException if the preparation efforts failed 430 * @see #getClientId() 431 */ 432 protected void prepareSharedConnection(Connection connection) throws JMSException { 433 String clientId = getClientId(); 434 if (clientId != null) { 435 connection.setClientID(clientId); 436 } 437 } 438 439 /** 440 * Start the shared Connection. 441 * @throws JMSException if thrown by JMS API methods 442 * @see javax.jms.Connection#start() 443 */ 444 protected void startSharedConnection() throws JMSException { 445 synchronized (this.sharedConnectionMonitor) { 446 this.sharedConnectionStarted = true; 447 if (this.sharedConnection != null) { 448 try { 449 this.sharedConnection.start(); 450 } 451 catch (javax.jms.IllegalStateException ex) { 452 logger.debug("Ignoring Connection start exception - assuming already started: " + ex); 453 } 454 } 455 } 456 } 457 458 /** 459 * Stop the shared Connection. 460 * @throws JMSException if thrown by JMS API methods 461 * @see javax.jms.Connection#start() 462 */ 463 protected void stopSharedConnection() throws JMSException { 464 synchronized (this.sharedConnectionMonitor) { 465 this.sharedConnectionStarted = false; 466 if (this.sharedConnection != null) { 467 try { 468 this.sharedConnection.stop(); 469 } 470 catch (javax.jms.IllegalStateException ex) { 471 logger.debug("Ignoring Connection stop exception - assuming already stopped: " + ex); 472 } 473 } 474 } 475 } 476 477 /** 478 * Return the shared JMS Connection maintained by this container. 479 * Available after initialization. 480 * @return the shared Connection (never {@code null}) 481 * @throws IllegalStateException if this container does not maintain a 482 * shared Connection, or if the Connection hasn't been initialized yet 483 * @see #sharedConnectionEnabled() 484 */ 485 protected final Connection getSharedConnection() { 486 if (!sharedConnectionEnabled()) { 487 throw new IllegalStateException( 488 "This listener container does not maintain a shared Connection"); 489 } 490 synchronized (this.sharedConnectionMonitor) { 491 if (this.sharedConnection == null) { 492 throw new SharedConnectionNotInitializedException( 493 "This listener container's shared Connection has not been initialized yet"); 494 } 495 return this.sharedConnection; 496 } 497 } 498 499 500 //------------------------------------------------------------------------- 501 // Management of paused tasks 502 //------------------------------------------------------------------------- 503 504 /** 505 * Take the given task object and reschedule it, either immediately if 506 * this container is currently running, or later once this container 507 * has been restarted. 508 * <p>If this container has already been shut down, the task will not 509 * get rescheduled at all. 510 * @param task the task object to reschedule 511 * @return whether the task has been rescheduled 512 * (either immediately or for a restart of this container) 513 * @see #doRescheduleTask 514 */ 515 protected final boolean rescheduleTaskIfNecessary(Object task) { 516 if (this.running) { 517 try { 518 doRescheduleTask(task); 519 } 520 catch (RuntimeException ex) { 521 logRejectedTask(task, ex); 522 this.pausedTasks.add(task); 523 } 524 return true; 525 } 526 else if (this.active) { 527 this.pausedTasks.add(task); 528 return true; 529 } 530 else { 531 return false; 532 } 533 } 534 535 /** 536 * Try to resume all paused tasks. 537 * Tasks for which rescheduling failed simply remain in paused mode. 538 */ 539 protected void resumePausedTasks() { 540 synchronized (this.lifecycleMonitor) { 541 if (!this.pausedTasks.isEmpty()) { 542 for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) { 543 Object task = it.next(); 544 try { 545 doRescheduleTask(task); 546 it.remove(); 547 if (logger.isDebugEnabled()) { 548 logger.debug("Resumed paused task: " + task); 549 } 550 } 551 catch (RuntimeException ex) { 552 logRejectedTask(task, ex); 553 // Keep the task in paused mode... 554 } 555 } 556 } 557 } 558 } 559 560 /** 561 * Determine the number of currently paused tasks, if any. 562 */ 563 public int getPausedTaskCount() { 564 synchronized (this.lifecycleMonitor) { 565 return this.pausedTasks.size(); 566 } 567 } 568 569 /** 570 * Reschedule the given task object immediately. 571 * <p>To be implemented by subclasses if they ever call 572 * {@code rescheduleTaskIfNecessary}. 573 * This implementation throws an UnsupportedOperationException. 574 * @param task the task object to reschedule 575 * @see #rescheduleTaskIfNecessary 576 */ 577 protected void doRescheduleTask(Object task) { 578 throw new UnsupportedOperationException( 579 ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks"); 580 } 581 582 /** 583 * Log a task that has been rejected by {@link #doRescheduleTask}. 584 * <p>The default implementation simply logs a corresponding message 585 * at debug level. 586 * @param task the rejected task object 587 * @param ex the exception thrown from {@link #doRescheduleTask} 588 */ 589 protected void logRejectedTask(Object task, RuntimeException ex) { 590 if (logger.isDebugEnabled()) { 591 logger.debug("Listener container task [" + task + "] has been rejected and paused: " + ex); 592 } 593 } 594 595 596 //------------------------------------------------------------------------- 597 // Template methods to be implemented by subclasses 598 //------------------------------------------------------------------------- 599 600 /** 601 * Return whether a shared JMS Connection should be maintained 602 * by this container base class. 603 * @see #getSharedConnection() 604 */ 605 protected abstract boolean sharedConnectionEnabled(); 606 607 /** 608 * Register any invokers within this container. 609 * <p>Subclasses need to implement this method for their specific 610 * invoker management process. 611 * <p>A shared JMS Connection, if any, will already have been 612 * started at this point. 613 * @throws JMSException if registration failed 614 * @see #getSharedConnection() 615 */ 616 protected abstract void doInitialize() throws JMSException; 617 618 /** 619 * Close the registered invokers. 620 * <p>Subclasses need to implement this method for their specific 621 * invoker management process. 622 * <p>A shared JMS Connection, if any, will automatically be closed 623 * <i>afterwards</i>. 624 * @throws JMSException if shutdown failed 625 * @see #shutdown() 626 */ 627 protected abstract void doShutdown() throws JMSException; 628 629 630 /** 631 * Exception that indicates that the initial setup of this container's 632 * shared JMS Connection failed. This is indicating to invokers that they need 633 * to establish the shared Connection themselves on first access. 634 */ 635 @SuppressWarnings("serial") 636 public static class SharedConnectionNotInitializedException extends RuntimeException { 637 638 /** 639 * Create a new SharedConnectionNotInitializedException. 640 * @param msg the detail message 641 */ 642 protected SharedConnectionNotInitializedException(String msg) { 643 super(msg); 644 } 645 } 646 647}