001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import java.lang.reflect.InvocationHandler; 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.lang.reflect.Proxy; 023import java.util.ArrayList; 024import java.util.LinkedHashSet; 025import java.util.List; 026import java.util.Set; 027import javax.jms.Connection; 028import javax.jms.ConnectionFactory; 029import javax.jms.ExceptionListener; 030import javax.jms.JMSException; 031import javax.jms.QueueConnection; 032import javax.jms.QueueConnectionFactory; 033import javax.jms.Session; 034import javax.jms.TopicConnection; 035import javax.jms.TopicConnectionFactory; 036 037import org.apache.commons.logging.Log; 038import org.apache.commons.logging.LogFactory; 039 040import org.springframework.beans.factory.DisposableBean; 041import org.springframework.beans.factory.InitializingBean; 042import org.springframework.util.Assert; 043import org.springframework.util.ClassUtils; 044 045/** 046 * A JMS ConnectionFactory adapter that returns the same Connection 047 * from all {@link #createConnection()} calls, and ignores calls to 048 * {@link javax.jms.Connection#close()}. According to the JMS Connection 049 * model, this is perfectly thread-safe (in contrast to e.g. JDBC). The 050 * shared Connection can be automatically recovered in case of an Exception. 051 * 052 * <p>You can either pass in a specific JMS Connection directly or let this 053 * factory lazily create a Connection via a given target ConnectionFactory. 054 * This factory generally works with JMS 1.1 as well as the JMS 1.0.2 API. 055 * 056 * <p>Note that when using the JMS 1.0.2 API, this ConnectionFactory will switch 057 * into queue/topic mode according to the JMS API methods used at runtime: 058 * {@code createQueueConnection} and {@code createTopicConnection} will 059 * lead to queue/topic mode, respectively; generic {@code createConnection} 060 * calls will lead to a JMS 1.1 connection which is able to serve both modes. 061 * 062 * <p>Useful for testing and standalone environments in order to keep using the 063 * same Connection for multiple {@link org.springframework.jms.core.JmsTemplate} 064 * calls, without having a pooling ConnectionFactory underneath. This may span 065 * any number of transactions, even concurrently executing transactions. 066 * 067 * <p>Note that Spring's message listener containers support the use of 068 * a shared Connection within each listener container instance. Using 069 * SingleConnectionFactory in combination only really makes sense for 070 * sharing a single JMS Connection <i>across multiple listener containers</i>. 071 * 072 * @author Juergen Hoeller 073 * @author Mark Pollack 074 * @since 1.1 075 * @see org.springframework.jms.core.JmsTemplate 076 * @see org.springframework.jms.listener.SimpleMessageListenerContainer 077 * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel 078 */ 079public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory, 080 TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean { 081 082 protected final Log logger = LogFactory.getLog(getClass()); 083 084 private ConnectionFactory targetConnectionFactory; 085 086 private String clientId; 087 088 private ExceptionListener exceptionListener; 089 090 private boolean reconnectOnException = false; 091 092 /** The target Connection */ 093 private Connection connection; 094 095 /** A hint whether to create a queue or topic connection */ 096 private Boolean pubSubMode; 097 098 /** An internal aggregator allowing for per-connection ExceptionListeners */ 099 private AggregatedExceptionListener aggregatedExceptionListener; 100 101 /** Whether the shared Connection has been started */ 102 private int startedCount = 0; 103 104 /** Synchronization monitor for the shared Connection */ 105 private final Object connectionMonitor = new Object(); 106 107 108 /** 109 * Create a new SingleConnectionFactory for bean-style usage. 110 * @see #setTargetConnectionFactory 111 */ 112 public SingleConnectionFactory() { 113 } 114 115 /** 116 * Create a new SingleConnectionFactory that always returns the given Connection. 117 * @param targetConnection the single Connection 118 */ 119 public SingleConnectionFactory(Connection targetConnection) { 120 Assert.notNull(targetConnection, "Target Connection must not be null"); 121 this.connection = targetConnection; 122 } 123 124 /** 125 * Create a new SingleConnectionFactory that always returns a single Connection 126 * that it will lazily create via the given target ConnectionFactory. 127 * @param targetConnectionFactory the target ConnectionFactory 128 */ 129 public SingleConnectionFactory(ConnectionFactory targetConnectionFactory) { 130 Assert.notNull(targetConnectionFactory, "Target ConnectionFactory must not be null"); 131 this.targetConnectionFactory = targetConnectionFactory; 132 } 133 134 135 /** 136 * Set the target ConnectionFactory which will be used to lazily 137 * create a single Connection. 138 */ 139 public void setTargetConnectionFactory(ConnectionFactory targetConnectionFactory) { 140 this.targetConnectionFactory = targetConnectionFactory; 141 } 142 143 /** 144 * Return the target ConnectionFactory which will be used to lazily 145 * create a single Connection, if any. 146 */ 147 public ConnectionFactory getTargetConnectionFactory() { 148 return this.targetConnectionFactory; 149 } 150 151 /** 152 * Specify a JMS client ID for the single Connection created and exposed 153 * by this ConnectionFactory. 154 * <p>Note that client IDs need to be unique among all active Connections 155 * of the underlying JMS provider. Furthermore, a client ID can only be 156 * assigned if the original ConnectionFactory hasn't already assigned one. 157 * @see javax.jms.Connection#setClientID 158 * @see #setTargetConnectionFactory 159 */ 160 public void setClientId(String clientId) { 161 this.clientId = clientId; 162 } 163 164 /** 165 * Return a JMS client ID for the single Connection created and exposed 166 * by this ConnectionFactory, if any. 167 */ 168 protected String getClientId() { 169 return this.clientId; 170 } 171 172 /** 173 * Specify an JMS ExceptionListener implementation that should be 174 * registered with the single Connection created by this factory. 175 * @see #setReconnectOnException 176 */ 177 public void setExceptionListener(ExceptionListener exceptionListener) { 178 this.exceptionListener = exceptionListener; 179 } 180 181 /** 182 * Return the JMS ExceptionListener implementation that should be registered 183 * with the single Connection created by this factory, if any. 184 */ 185 protected ExceptionListener getExceptionListener() { 186 return this.exceptionListener; 187 } 188 189 /** 190 * Specify whether the single Connection should be reset (to be subsequently renewed) 191 * when a JMSException is reported by the underlying Connection. 192 * <p>Default is "false". Switch this to "true" to automatically trigger 193 * recovery based on your JMS provider's exception notifications. 194 * <p>Internally, this will lead to a special JMS ExceptionListener 195 * (this SingleConnectionFactory itself) being registered with the 196 * underlying Connection. This can also be combined with a 197 * user-specified ExceptionListener, if desired. 198 * @see #setExceptionListener 199 */ 200 public void setReconnectOnException(boolean reconnectOnException) { 201 this.reconnectOnException = reconnectOnException; 202 } 203 204 /** 205 * Return whether the single Connection should be renewed when 206 * a JMSException is reported by the underlying Connection. 207 */ 208 protected boolean isReconnectOnException() { 209 return this.reconnectOnException; 210 } 211 212 /** 213 * Make sure a Connection or ConnectionFactory has been set. 214 */ 215 @Override 216 public void afterPropertiesSet() { 217 if (this.connection == null && getTargetConnectionFactory() == null) { 218 throw new IllegalArgumentException("Target Connection or ConnectionFactory is required"); 219 } 220 } 221 222 223 @Override 224 public Connection createConnection() throws JMSException { 225 return getSharedConnectionProxy(getConnection()); 226 } 227 228 @Override 229 public Connection createConnection(String username, String password) throws JMSException { 230 throw new javax.jms.IllegalStateException( 231 "SingleConnectionFactory does not support custom username and password"); 232 } 233 234 @Override 235 public QueueConnection createQueueConnection() throws JMSException { 236 Connection con; 237 synchronized (this.connectionMonitor) { 238 this.pubSubMode = Boolean.FALSE; 239 con = createConnection(); 240 } 241 if (!(con instanceof QueueConnection)) { 242 throw new javax.jms.IllegalStateException( 243 "This SingleConnectionFactory does not hold a QueueConnection but rather: " + con); 244 } 245 return ((QueueConnection) con); 246 } 247 248 @Override 249 public QueueConnection createQueueConnection(String username, String password) throws JMSException { 250 throw new javax.jms.IllegalStateException( 251 "SingleConnectionFactory does not support custom username and password"); 252 } 253 254 @Override 255 public TopicConnection createTopicConnection() throws JMSException { 256 Connection con; 257 synchronized (this.connectionMonitor) { 258 this.pubSubMode = Boolean.TRUE; 259 con = createConnection(); 260 } 261 if (!(con instanceof TopicConnection)) { 262 throw new javax.jms.IllegalStateException( 263 "This SingleConnectionFactory does not hold a TopicConnection but rather: " + con); 264 } 265 return ((TopicConnection) con); 266 } 267 268 @Override 269 public TopicConnection createTopicConnection(String username, String password) throws JMSException { 270 throw new javax.jms.IllegalStateException( 271 "SingleConnectionFactory does not support custom username and password"); 272 } 273 274 275 /** 276 * Obtain an initialized shared Connection. 277 * @return the Connection (never {@code null}) 278 * @throws javax.jms.JMSException if thrown by JMS API methods 279 * @see #initConnection() 280 */ 281 protected Connection getConnection() throws JMSException { 282 synchronized (this.connectionMonitor) { 283 if (this.connection == null) { 284 initConnection(); 285 } 286 return this.connection; 287 } 288 } 289 290 /** 291 * Initialize the underlying shared Connection. 292 * <p>Closes and reinitializes the Connection if an underlying 293 * Connection is present already. 294 * @throws javax.jms.JMSException if thrown by JMS API methods 295 * @see #prepareConnection 296 */ 297 public void initConnection() throws JMSException { 298 if (getTargetConnectionFactory() == null) { 299 throw new IllegalStateException( 300 "'targetConnectionFactory' is required for lazily initializing a Connection"); 301 } 302 synchronized (this.connectionMonitor) { 303 if (this.connection != null) { 304 closeConnection(this.connection); 305 } 306 this.connection = doCreateConnection(); 307 prepareConnection(this.connection); 308 if (this.startedCount > 0) { 309 this.connection.start(); 310 } 311 if (logger.isInfoEnabled()) { 312 logger.info("Established shared JMS Connection: " + this.connection); 313 } 314 } 315 } 316 317 /** 318 * Exception listener callback that renews the underlying single Connection. 319 * @see #resetConnection() 320 */ 321 @Override 322 public void onException(JMSException ex) { 323 logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex); 324 resetConnection(); 325 } 326 327 /** 328 * Close the underlying shared connection. 329 * The provider of this ConnectionFactory needs to care for proper shutdown. 330 * <p>As this bean implements DisposableBean, a bean factory will 331 * automatically invoke this on destruction of its cached singletons. 332 * @see #resetConnection() 333 */ 334 @Override 335 public void destroy() { 336 resetConnection(); 337 } 338 339 /** 340 * Reset the underlying shared Connection, to be reinitialized on next access. 341 * @see #closeConnection 342 */ 343 public void resetConnection() { 344 synchronized (this.connectionMonitor) { 345 if (this.connection != null) { 346 closeConnection(this.connection); 347 } 348 this.connection = null; 349 } 350 } 351 352 /** 353 * Create a JMS Connection via this template's ConnectionFactory. 354 * @return the new JMS Connection 355 * @throws javax.jms.JMSException if thrown by JMS API methods 356 */ 357 protected Connection doCreateConnection() throws JMSException { 358 ConnectionFactory cf = getTargetConnectionFactory(); 359 if (Boolean.FALSE.equals(this.pubSubMode) && cf instanceof QueueConnectionFactory) { 360 return ((QueueConnectionFactory) cf).createQueueConnection(); 361 } 362 else if (Boolean.TRUE.equals(this.pubSubMode) && cf instanceof TopicConnectionFactory) { 363 return ((TopicConnectionFactory) cf).createTopicConnection(); 364 } 365 else { 366 return getTargetConnectionFactory().createConnection(); 367 } 368 } 369 370 /** 371 * Prepare the given Connection before it is exposed. 372 * <p>The default implementation applies ExceptionListener and client id. 373 * Can be overridden in subclasses. 374 * @param con the Connection to prepare 375 * @throws JMSException if thrown by JMS API methods 376 * @see #setExceptionListener 377 * @see #setReconnectOnException 378 */ 379 protected void prepareConnection(Connection con) throws JMSException { 380 if (getClientId() != null) { 381 con.setClientID(getClientId()); 382 } 383 if (this.aggregatedExceptionListener != null) { 384 con.setExceptionListener(this.aggregatedExceptionListener); 385 } 386 else if (getExceptionListener() != null || isReconnectOnException()) { 387 ExceptionListener listenerToUse = getExceptionListener(); 388 if (isReconnectOnException()) { 389 this.aggregatedExceptionListener = new AggregatedExceptionListener(); 390 this.aggregatedExceptionListener.delegates.add(this); 391 if (listenerToUse != null) { 392 this.aggregatedExceptionListener.delegates.add(listenerToUse); 393 } 394 listenerToUse = this.aggregatedExceptionListener; 395 } 396 con.setExceptionListener(listenerToUse); 397 } 398 } 399 400 /** 401 * Template method for obtaining a (potentially cached) Session. 402 * <p>The default implementation always returns {@code null}. 403 * Subclasses may override this for exposing specific Session handles, 404 * possibly delegating to {@link #createSession} for the creation of raw 405 * Session objects that will then get wrapped and returned from here. 406 * @param con the JMS Connection to operate on 407 * @param mode the Session acknowledgement mode 408 * ({@code Session.TRANSACTED} or one of the common modes) 409 * @return the Session to use, or {@code null} to indicate 410 * creation of a raw standard Session 411 * @throws JMSException if thrown by the JMS API 412 */ 413 protected Session getSession(Connection con, Integer mode) throws JMSException { 414 return null; 415 } 416 417 /** 418 * Create a default Session for this ConnectionFactory, 419 * adapting to JMS 1.0.2 style queue/topic mode if necessary. 420 * @param con the JMS Connection to operate on 421 * @param mode the Session acknowledgement mode 422 * ({@code Session.TRANSACTED} or one of the common modes) 423 * @return the newly created Session 424 * @throws JMSException if thrown by the JMS API 425 */ 426 protected Session createSession(Connection con, Integer mode) throws JMSException { 427 // Determine JMS API arguments... 428 boolean transacted = (mode == Session.SESSION_TRANSACTED); 429 int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode); 430 // Now actually call the appropriate JMS factory method... 431 if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) { 432 return ((QueueConnection) con).createQueueSession(transacted, ackMode); 433 } 434 else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) { 435 return ((TopicConnection) con).createTopicSession(transacted, ackMode); 436 } 437 else { 438 return con.createSession(transacted, ackMode); 439 } 440 } 441 442 /** 443 * Close the given Connection. 444 * @param con the Connection to close 445 */ 446 protected void closeConnection(Connection con) { 447 if (logger.isDebugEnabled()) { 448 logger.debug("Closing shared JMS Connection: " + con); 449 } 450 try { 451 try { 452 if (this.startedCount > 0) { 453 con.stop(); 454 } 455 } 456 finally { 457 con.close(); 458 } 459 } 460 catch (javax.jms.IllegalStateException ex) { 461 logger.debug("Ignoring Connection state exception - assuming already closed: " + ex); 462 } 463 catch (Throwable ex) { 464 logger.debug("Could not close shared JMS Connection", ex); 465 } 466 } 467 468 /** 469 * Wrap the given Connection with a proxy that delegates every method call to it 470 * but suppresses close calls. This is useful for allowing application code to 471 * handle a special framework Connection just like an ordinary Connection from a 472 * JMS ConnectionFactory. 473 * @param target the original Connection to wrap 474 * @return the wrapped Connection 475 */ 476 protected Connection getSharedConnectionProxy(Connection target) { 477 List<Class<?>> classes = new ArrayList<Class<?>>(3); 478 classes.add(Connection.class); 479 if (target instanceof QueueConnection) { 480 classes.add(QueueConnection.class); 481 } 482 if (target instanceof TopicConnection) { 483 classes.add(TopicConnection.class); 484 } 485 return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), 486 ClassUtils.toClassArray(classes), new SharedConnectionInvocationHandler()); 487 } 488 489 490 /** 491 * Invocation handler for a cached JMS Connection proxy. 492 */ 493 private class SharedConnectionInvocationHandler implements InvocationHandler { 494 495 private ExceptionListener localExceptionListener; 496 497 private boolean locallyStarted = false; 498 499 @Override 500 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 501 if (method.getName().equals("equals")) { 502 Object other = args[0]; 503 if (proxy == other) { 504 return true; 505 } 506 if (other == null || !Proxy.isProxyClass(other.getClass())) { 507 return false; 508 } 509 InvocationHandler otherHandler = Proxy.getInvocationHandler(other); 510 return (otherHandler instanceof SharedConnectionInvocationHandler && 511 factory() == ((SharedConnectionInvocationHandler) otherHandler).factory()); 512 } 513 else if (method.getName().equals("hashCode")) { 514 // Use hashCode of containing SingleConnectionFactory. 515 return System.identityHashCode(factory()); 516 } 517 else if (method.getName().equals("toString")) { 518 return "Shared JMS Connection: " + getConnection(); 519 } 520 else if (method.getName().equals("setClientID")) { 521 // Handle setClientID method: throw exception if not compatible. 522 String currentClientId = getConnection().getClientID(); 523 if (currentClientId != null && currentClientId.equals(args[0])) { 524 return null; 525 } 526 else { 527 throw new javax.jms.IllegalStateException( 528 "setClientID call not supported on proxy for shared Connection. " + 529 "Set the 'clientId' property on the SingleConnectionFactory instead."); 530 } 531 } 532 else if (method.getName().equals("setExceptionListener")) { 533 // Handle setExceptionListener method: add to the chain. 534 synchronized (connectionMonitor) { 535 if (aggregatedExceptionListener != null) { 536 ExceptionListener listener = (ExceptionListener) args[0]; 537 if (listener != this.localExceptionListener) { 538 if (this.localExceptionListener != null) { 539 aggregatedExceptionListener.delegates.remove(this.localExceptionListener); 540 } 541 if (listener != null) { 542 aggregatedExceptionListener.delegates.add(listener); 543 } 544 this.localExceptionListener = listener; 545 } 546 return null; 547 } 548 else { 549 throw new javax.jms.IllegalStateException( 550 "setExceptionListener call not supported on proxy for shared Connection. " + 551 "Set the 'exceptionListener' property on the SingleConnectionFactory instead. " + 552 "Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " + 553 "which will allow for registering further ExceptionListeners to the recovery chain."); 554 } 555 } 556 } 557 else if (method.getName().equals("getExceptionListener")) { 558 synchronized (connectionMonitor) { 559 if (this.localExceptionListener != null) { 560 return this.localExceptionListener; 561 } 562 else { 563 return getExceptionListener(); 564 } 565 } 566 } 567 else if (method.getName().equals("start")) { 568 localStart(); 569 return null; 570 } 571 else if (method.getName().equals("stop")) { 572 localStop(); 573 return null; 574 } 575 else if (method.getName().equals("close")) { 576 localStop(); 577 synchronized (connectionMonitor) { 578 if (this.localExceptionListener != null) { 579 if (aggregatedExceptionListener != null) { 580 aggregatedExceptionListener.delegates.remove(this.localExceptionListener); 581 } 582 this.localExceptionListener = null; 583 } 584 } 585 return null; 586 } 587 else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") || 588 method.getName().equals("createTopicSession")) { 589 // Default: JMS 2.0 createSession() method 590 Integer mode = Session.AUTO_ACKNOWLEDGE; 591 if (args != null) { 592 if (args.length == 1) { 593 // JMS 2.0 createSession(int) method 594 mode = (Integer) args[0]; 595 } 596 else if (args.length == 2) { 597 // JMS 1.1 createSession(boolean, int) method 598 boolean transacted = (Boolean) args[0]; 599 Integer ackMode = (Integer) args[1]; 600 mode = (transacted ? Session.SESSION_TRANSACTED : ackMode); 601 } 602 } 603 Session session = getSession(getConnection(), mode); 604 if (session != null) { 605 if (!method.getReturnType().isInstance(session)) { 606 String msg = "JMS Session does not implement specific domain: " + session; 607 try { 608 session.close(); 609 } 610 catch (Throwable ex) { 611 logger.trace("Failed to close newly obtained JMS Session", ex); 612 } 613 throw new javax.jms.IllegalStateException(msg); 614 } 615 return session; 616 } 617 } 618 try { 619 return method.invoke(getConnection(), args); 620 } 621 catch (InvocationTargetException ex) { 622 throw ex.getTargetException(); 623 } 624 } 625 626 private void localStart() throws JMSException { 627 synchronized (connectionMonitor) { 628 if (!this.locallyStarted) { 629 this.locallyStarted = true; 630 if (startedCount == 0 && connection != null) { 631 connection.start(); 632 } 633 startedCount++; 634 } 635 } 636 } 637 638 private void localStop() throws JMSException { 639 synchronized (connectionMonitor) { 640 if (this.locallyStarted) { 641 this.locallyStarted = false; 642 if (startedCount == 1 && connection != null) { 643 connection.stop(); 644 } 645 if (startedCount > 0) { 646 startedCount--; 647 } 648 } 649 } 650 } 651 652 private SingleConnectionFactory factory() { 653 return SingleConnectionFactory.this; 654 } 655 } 656 657 658 /** 659 * Internal aggregated ExceptionListener for handling the internal 660 * recovery listener in combination with user-specified listeners. 661 */ 662 private class AggregatedExceptionListener implements ExceptionListener { 663 664 final Set<ExceptionListener> delegates = new LinkedHashSet<ExceptionListener>(2); 665 666 @Override 667 public void onException(JMSException ex) { 668 // Iterate over temporary copy in order to avoid ConcurrentModificationException, 669 // since listener invocations may in turn trigger registration of listeners... 670 Set<ExceptionListener> copy; 671 synchronized (connectionMonitor) { 672 copy = new LinkedHashSet<ExceptionListener>(this.delegates); 673 } 674 for (ExceptionListener listener : copy) { 675 listener.onException(ex); 676 } 677 } 678 } 679 680}