001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.util.Iterator; 020import java.util.LinkedList; 021import java.util.List; 022import javax.jms.Connection; 023import javax.jms.JMSException; 024 025import org.springframework.beans.factory.BeanNameAware; 026import org.springframework.beans.factory.DisposableBean; 027import org.springframework.context.SmartLifecycle; 028import org.springframework.jms.JmsException; 029import org.springframework.jms.connection.ConnectionFactoryUtils; 030import org.springframework.jms.support.JmsUtils; 031import org.springframework.jms.support.destination.JmsDestinationAccessor; 032import org.springframework.util.ClassUtils; 033 034/** 035 * Common base class for all containers which need to implement listening 036 * based on a JMS Connection (either shared or freshly obtained for each attempt). 037 * Inherits basic Connection and Session configuration handling from the 038 * {@link org.springframework.jms.support.JmsAccessor} base class. 039 * 040 * <p>This class provides basic lifecycle management, in particular management 041 * of a shared JMS Connection. Subclasses are supposed to plug into this 042 * lifecycle, implementing the {@link #sharedConnectionEnabled()} as well 043 * as the {@link #doInitialize()} and {@link #doShutdown()} template methods. 044 * 045 * <p>This base class does not assume any specific listener programming model 046 * or listener invoker mechanism. It just provides the general runtime 047 * lifecycle management needed for any kind of JMS-based listening mechanism 048 * that operates on a JMS Connection/Session. 049 * 050 * <p>For a concrete listener programming model, check out the 051 * {@link AbstractMessageListenerContainer} subclass. For a concrete listener 052 * invoker mechanism, check out the {@link DefaultMessageListenerContainer} class. 053 * 054 * @author Juergen Hoeller 055 * @since 2.0.3 056 * @see #sharedConnectionEnabled() 057 * @see #doInitialize() 058 * @see #doShutdown() 059 */ 060public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor 061 implements BeanNameAware, DisposableBean, SmartLifecycle { 062 063 private String clientId; 064 065 private boolean autoStartup = true; 066 067 private int phase = Integer.MAX_VALUE; 068 069 private String beanName; 070 071 private Connection sharedConnection; 072 073 private boolean sharedConnectionStarted = false; 074 075 protected final Object sharedConnectionMonitor = new Object(); 076 077 private boolean active = false; 078 079 private volatile boolean running = false; 080 081 private final List<Object> pausedTasks = new LinkedList<Object>(); 082 083 protected final Object lifecycleMonitor = new Object(); 084 085 086 /** 087 * Specify the JMS client ID for a shared Connection created and used 088 * by this container. 089 * <p>Note that client IDs need to be unique among all active Connections 090 * of the underlying JMS provider. Furthermore, a client ID can only be 091 * assigned if the original ConnectionFactory hasn't already assigned one. 092 * @see javax.jms.Connection#setClientID 093 * @see #setConnectionFactory 094 */ 095 public void setClientId(String clientId) { 096 this.clientId = clientId; 097 } 098 099 /** 100 * Return the JMS client ID for the shared Connection created and used 101 * by this container, if any. 102 */ 103 public String getClientId() { 104 return this.clientId; 105 } 106 107 /** 108 * Set whether to automatically start the container after initialization. 109 * <p>Default is "true"; set this to "false" to allow for manual startup 110 * through the {@link #start()} method. 111 */ 112 public void setAutoStartup(boolean autoStartup) { 113 this.autoStartup = autoStartup; 114 } 115 116 @Override 117 public boolean isAutoStartup() { 118 return this.autoStartup; 119 } 120 121 /** 122 * Specify the phase in which this container should be started and 123 * stopped. The startup order proceeds from lowest to highest, and 124 * the shutdown order is the reverse of that. By default this value 125 * is Integer.MAX_VALUE meaning that this container starts as late 126 * as possible and stops as soon as possible. 127 */ 128 public void setPhase(int phase) { 129 this.phase = phase; 130 } 131 132 /** 133 * Return the phase in which this container will be started and stopped. 134 */ 135 @Override 136 public int getPhase() { 137 return this.phase; 138 } 139 140 @Override 141 public void setBeanName(String beanName) { 142 this.beanName = beanName; 143 } 144 145 /** 146 * Return the bean name that this listener container has been assigned 147 * in its containing bean factory, if any. 148 */ 149 protected final String getBeanName() { 150 return this.beanName; 151 } 152 153 154 /** 155 * Delegates to {@link #validateConfiguration()} and {@link #initialize()}. 156 */ 157 @Override 158 public void afterPropertiesSet() { 159 super.afterPropertiesSet(); 160 validateConfiguration(); 161 initialize(); 162 } 163 164 /** 165 * Validate the configuration of this container. 166 * <p>The default implementation is empty. To be overridden in subclasses. 167 */ 168 protected void validateConfiguration() { 169 } 170 171 /** 172 * Calls {@link #shutdown()} when the BeanFactory destroys the container instance. 173 * @see #shutdown() 174 */ 175 @Override 176 public void destroy() { 177 shutdown(); 178 } 179 180 181 //------------------------------------------------------------------------- 182 // Lifecycle methods for starting and stopping the container 183 //------------------------------------------------------------------------- 184 185 /** 186 * Initialize this container. 187 * <p>Creates a JMS Connection, starts the {@link javax.jms.Connection} 188 * (if {@link #setAutoStartup(boolean) "autoStartup"} hasn't been turned off), 189 * and calls {@link #doInitialize()}. 190 * @throws org.springframework.jms.JmsException if startup failed 191 */ 192 public void initialize() throws JmsException { 193 try { 194 synchronized (this.lifecycleMonitor) { 195 this.active = true; 196 this.lifecycleMonitor.notifyAll(); 197 } 198 doInitialize(); 199 } 200 catch (JMSException ex) { 201 synchronized (this.sharedConnectionMonitor) { 202 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup); 203 this.sharedConnection = null; 204 } 205 throw convertJmsAccessException(ex); 206 } 207 } 208 209 /** 210 * Stop the shared Connection, call {@link #doShutdown()}, 211 * and close this container. 212 * @throws JmsException if shutdown failed 213 */ 214 public void shutdown() throws JmsException { 215 logger.debug("Shutting down JMS listener container"); 216 boolean wasRunning; 217 synchronized (this.lifecycleMonitor) { 218 wasRunning = this.running; 219 this.running = false; 220 this.active = false; 221 this.pausedTasks.clear(); 222 this.lifecycleMonitor.notifyAll(); 223 } 224 225 // Stop shared Connection early, if necessary. 226 if (wasRunning && sharedConnectionEnabled()) { 227 try { 228 stopSharedConnection(); 229 } 230 catch (Throwable ex) { 231 logger.debug("Could not stop JMS Connection on shutdown", ex); 232 } 233 } 234 235 // Shut down the invokers. 236 try { 237 doShutdown(); 238 } 239 catch (JMSException ex) { 240 throw convertJmsAccessException(ex); 241 } 242 finally { 243 if (sharedConnectionEnabled()) { 244 synchronized (this.sharedConnectionMonitor) { 245 ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), false); 246 this.sharedConnection = null; 247 } 248 } 249 } 250 } 251 252 /** 253 * Return whether this container is currently active, 254 * that is, whether it has been set up but not shut down yet. 255 */ 256 public final boolean isActive() { 257 synchronized (this.lifecycleMonitor) { 258 return this.active; 259 } 260 } 261 262 /** 263 * Start this container. 264 * @throws JmsException if starting failed 265 * @see #doStart 266 */ 267 @Override 268 public void start() throws JmsException { 269 try { 270 doStart(); 271 } 272 catch (JMSException ex) { 273 throw convertJmsAccessException(ex); 274 } 275 } 276 277 /** 278 * Start the shared Connection, if any, and notify all invoker tasks. 279 * @throws JMSException if thrown by JMS API methods 280 * @see #startSharedConnection 281 */ 282 protected void doStart() throws JMSException { 283 // Lazily establish a shared Connection, if necessary. 284 if (sharedConnectionEnabled()) { 285 establishSharedConnection(); 286 } 287 288 // Reschedule paused tasks, if any. 289 synchronized (this.lifecycleMonitor) { 290 this.running = true; 291 this.lifecycleMonitor.notifyAll(); 292 resumePausedTasks(); 293 } 294 295 // Start the shared Connection, if any. 296 if (sharedConnectionEnabled()) { 297 startSharedConnection(); 298 } 299 } 300 301 /** 302 * Stop this container. 303 * @throws JmsException if stopping failed 304 * @see #doStop 305 */ 306 @Override 307 public void stop() throws JmsException { 308 try { 309 doStop(); 310 } 311 catch (JMSException ex) { 312 throw convertJmsAccessException(ex); 313 } 314 } 315 316 @Override 317 public void stop(Runnable callback) { 318 stop(); 319 callback.run(); 320 } 321 322 /** 323 * Notify all invoker tasks and stop the shared Connection, if any. 324 * @throws JMSException if thrown by JMS API methods 325 * @see #stopSharedConnection 326 */ 327 protected void doStop() throws JMSException { 328 synchronized (this.lifecycleMonitor) { 329 this.running = false; 330 this.lifecycleMonitor.notifyAll(); 331 } 332 333 if (sharedConnectionEnabled()) { 334 stopSharedConnection(); 335 } 336 } 337 338 /** 339 * Determine whether this container is currently running, 340 * that is, whether it has been started and not stopped yet. 341 * @see #start() 342 * @see #stop() 343 * @see #runningAllowed() 344 */ 345 @Override 346 public final boolean isRunning() { 347 return (this.running && runningAllowed()); 348 } 349 350 /** 351 * Check whether this container's listeners are generally allowed to run. 352 * <p>This implementation always returns {@code true}; the default 'running' 353 * state is purely determined by {@link #start()} / {@link #stop()}. 354 * <p>Subclasses may override this method to check against temporary 355 * conditions that prevent listeners from actually running. In other words, 356 * they may apply further restrictions to the 'running' state, returning 357 * {@code false} if such a restriction prevents listeners from running. 358 */ 359 protected boolean runningAllowed() { 360 return true; 361 } 362 363 364 //------------------------------------------------------------------------- 365 // Management of a shared JMS Connection 366 //------------------------------------------------------------------------- 367 368 /** 369 * Establish a shared Connection for this container. 370 * <p>The default implementation delegates to {@link #createSharedConnection()}, 371 * which does one immediate attempt and throws an exception if it fails. 372 * Can be overridden to have a recovery process in place, retrying 373 * until a Connection can be successfully established. 374 * @throws JMSException if thrown by JMS API methods 375 */ 376 protected void establishSharedConnection() throws JMSException { 377 synchronized (this.sharedConnectionMonitor) { 378 if (this.sharedConnection == null) { 379 this.sharedConnection = createSharedConnection(); 380 logger.debug("Established shared JMS Connection"); 381 } 382 } 383 } 384 385 /** 386 * Refresh the shared Connection that this container holds. 387 * <p>Called on startup and also after an infrastructure exception 388 * that occurred during invoker setup and/or execution. 389 * @throws JMSException if thrown by JMS API methods 390 */ 391 protected final void refreshSharedConnection() throws JMSException { 392 synchronized (this.sharedConnectionMonitor) { 393 ConnectionFactoryUtils.releaseConnection( 394 this.sharedConnection, getConnectionFactory(), this.sharedConnectionStarted); 395 this.sharedConnection = null; 396 this.sharedConnection = createSharedConnection(); 397 if (this.sharedConnectionStarted) { 398 this.sharedConnection.start(); 399 } 400 } 401 } 402 403 /** 404 * Create a shared Connection for this container. 405 * <p>The default implementation creates a standard Connection 406 * and prepares it through {@link #prepareSharedConnection}. 407 * @return the prepared Connection 408 * @throws JMSException if the creation failed 409 */ 410 protected Connection createSharedConnection() throws JMSException { 411 Connection con = createConnection(); 412 try { 413 prepareSharedConnection(con); 414 return con; 415 } 416 catch (JMSException ex) { 417 JmsUtils.closeConnection(con); 418 throw ex; 419 } 420 } 421 422 /** 423 * Prepare the given Connection, which is about to be registered 424 * as shared Connection for this container. 425 * <p>The default implementation sets the specified client id, if any. 426 * Subclasses can override this to apply further settings. 427 * @param connection the Connection to prepare 428 * @throws JMSException if the preparation efforts failed 429 * @see #getClientId() 430 */ 431 protected void prepareSharedConnection(Connection connection) throws JMSException { 432 String clientId = getClientId(); 433 if (clientId != null) { 434 connection.setClientID(clientId); 435 } 436 } 437 438 /** 439 * Start the shared Connection. 440 * @throws JMSException if thrown by JMS API methods 441 * @see javax.jms.Connection#start() 442 */ 443 protected void startSharedConnection() throws JMSException { 444 synchronized (this.sharedConnectionMonitor) { 445 this.sharedConnectionStarted = true; 446 if (this.sharedConnection != null) { 447 try { 448 this.sharedConnection.start(); 449 } 450 catch (javax.jms.IllegalStateException ex) { 451 logger.debug("Ignoring Connection start exception - assuming already started: " + ex); 452 } 453 } 454 } 455 } 456 457 /** 458 * Stop the shared Connection. 459 * @throws JMSException if thrown by JMS API methods 460 * @see javax.jms.Connection#start() 461 */ 462 protected void stopSharedConnection() throws JMSException { 463 synchronized (this.sharedConnectionMonitor) { 464 this.sharedConnectionStarted = false; 465 if (this.sharedConnection != null) { 466 try { 467 this.sharedConnection.stop(); 468 } 469 catch (javax.jms.IllegalStateException ex) { 470 logger.debug("Ignoring Connection stop exception - assuming already stopped: " + ex); 471 } 472 } 473 } 474 } 475 476 /** 477 * Return the shared JMS Connection maintained by this container. 478 * Available after initialization. 479 * @return the shared Connection (never {@code null}) 480 * @throws IllegalStateException if this container does not maintain a 481 * shared Connection, or if the Connection hasn't been initialized yet 482 * @see #sharedConnectionEnabled() 483 */ 484 protected final Connection getSharedConnection() { 485 if (!sharedConnectionEnabled()) { 486 throw new IllegalStateException( 487 "This listener container does not maintain a shared Connection"); 488 } 489 synchronized (this.sharedConnectionMonitor) { 490 if (this.sharedConnection == null) { 491 throw new SharedConnectionNotInitializedException( 492 "This listener container's shared Connection has not been initialized yet"); 493 } 494 return this.sharedConnection; 495 } 496 } 497 498 499 //------------------------------------------------------------------------- 500 // Management of paused tasks 501 //------------------------------------------------------------------------- 502 503 /** 504 * Take the given task object and reschedule it, either immediately if 505 * this container is currently running, or later once this container 506 * has been restarted. 507 * <p>If this container has already been shut down, the task will not 508 * get rescheduled at all. 509 * @param task the task object to reschedule 510 * @return whether the task has been rescheduled 511 * (either immediately or for a restart of this container) 512 * @see #doRescheduleTask 513 */ 514 protected final boolean rescheduleTaskIfNecessary(Object task) { 515 if (this.running) { 516 try { 517 doRescheduleTask(task); 518 } 519 catch (RuntimeException ex) { 520 logRejectedTask(task, ex); 521 this.pausedTasks.add(task); 522 } 523 return true; 524 } 525 else if (this.active) { 526 this.pausedTasks.add(task); 527 return true; 528 } 529 else { 530 return false; 531 } 532 } 533 534 /** 535 * Try to resume all paused tasks. 536 * Tasks for which rescheduling failed simply remain in paused mode. 537 */ 538 protected void resumePausedTasks() { 539 synchronized (this.lifecycleMonitor) { 540 if (!this.pausedTasks.isEmpty()) { 541 for (Iterator<?> it = this.pausedTasks.iterator(); it.hasNext();) { 542 Object task = it.next(); 543 try { 544 doRescheduleTask(task); 545 it.remove(); 546 if (logger.isDebugEnabled()) { 547 logger.debug("Resumed paused task: " + task); 548 } 549 } 550 catch (RuntimeException ex) { 551 logRejectedTask(task, ex); 552 // Keep the task in paused mode... 553 } 554 } 555 } 556 } 557 } 558 559 /** 560 * Determine the number of currently paused tasks, if any. 561 */ 562 public int getPausedTaskCount() { 563 synchronized (this.lifecycleMonitor) { 564 return this.pausedTasks.size(); 565 } 566 } 567 568 /** 569 * Reschedule the given task object immediately. 570 * <p>To be implemented by subclasses if they ever call 571 * {@code rescheduleTaskIfNecessary}. 572 * This implementation throws an UnsupportedOperationException. 573 * @param task the task object to reschedule 574 * @see #rescheduleTaskIfNecessary 575 */ 576 protected void doRescheduleTask(Object task) { 577 throw new UnsupportedOperationException( 578 ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks"); 579 } 580 581 /** 582 * Log a task that has been rejected by {@link #doRescheduleTask}. 583 * <p>The default implementation simply logs a corresponding message 584 * at debug level. 585 * @param task the rejected task object 586 * @param ex the exception thrown from {@link #doRescheduleTask} 587 */ 588 protected void logRejectedTask(Object task, RuntimeException ex) { 589 if (logger.isDebugEnabled()) { 590 logger.debug("Listener container task [" + task + "] has been rejected and paused: " + ex); 591 } 592 } 593 594 595 //------------------------------------------------------------------------- 596 // Template methods to be implemented by subclasses 597 //------------------------------------------------------------------------- 598 599 /** 600 * Return whether a shared JMS Connection should be maintained 601 * by this container base class. 602 * @see #getSharedConnection() 603 */ 604 protected abstract boolean sharedConnectionEnabled(); 605 606 /** 607 * Register any invokers within this container. 608 * <p>Subclasses need to implement this method for their specific 609 * invoker management process. 610 * <p>A shared JMS Connection, if any, will already have been 611 * started at this point. 612 * @throws JMSException if registration failed 613 * @see #getSharedConnection() 614 */ 615 protected abstract void doInitialize() throws JMSException; 616 617 /** 618 * Close the registered invokers. 619 * <p>Subclasses need to implement this method for their specific 620 * invoker management process. 621 * <p>A shared JMS Connection, if any, will automatically be closed 622 * <i>afterwards</i>. 623 * @throws JMSException if shutdown failed 624 * @see #shutdown() 625 */ 626 protected abstract void doShutdown() throws JMSException; 627 628 629 /** 630 * Exception that indicates that the initial setup of this container's 631 * shared JMS Connection failed. This is indicating to invokers that they need 632 * to establish the shared Connection themselves on first access. 633 */ 634 @SuppressWarnings("serial") 635 public static class SharedConnectionNotInitializedException extends RuntimeException { 636 637 /** 638 * Create a new SharedConnectionNotInitializedException. 639 * @param msg the detail message 640 */ 641 protected SharedConnectionNotInitializedException(String msg) { 642 super(msg); 643 } 644 } 645 646}