001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import java.lang.reflect.InvocationHandler; 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.lang.reflect.Proxy; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import javax.jms.Connection; 030import javax.jms.ConnectionFactory; 031import javax.jms.Destination; 032import javax.jms.JMSException; 033import javax.jms.MessageConsumer; 034import javax.jms.MessageProducer; 035import javax.jms.QueueSession; 036import javax.jms.Session; 037import javax.jms.TemporaryQueue; 038import javax.jms.TemporaryTopic; 039import javax.jms.Topic; 040import javax.jms.TopicSession; 041 042import org.springframework.util.Assert; 043import org.springframework.util.ClassUtils; 044import org.springframework.util.ObjectUtils; 045import org.springframework.util.ReflectionUtils; 046 047/** 048 * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session} 049 * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory 050 * also switches the {@link #setReconnectOnException "reconnectOnException" property} 051 * to "true" by default, allowing for automatic recovery of the underlying Connection. 052 * 053 * <p>By default, only one single Session will be cached, with further requested 054 * Sessions being created and disposed on demand. Consider raising the 055 * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a 056 * high-concurrency environment. 057 * 058 * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch 059 * into queue/topic mode according to the JMS API methods used at runtime: 060 * {@code createQueueConnection} and {@code createTopicConnection} will 061 * lead to queue/topic mode, respectively; generic {@code createConnection} 062 * calls will lead to a JMS 1.1 connection which is able to serve both modes. 063 * 064 * <p><b>NOTE: This ConnectionFactory requires explicit closing of all Sessions 065 * obtained from its shared Connection.</b> This is the usual recommendation for 066 * native JMS access code anyway. However, with this ConnectionFactory, its use 067 * is mandatory in order to actually allow for Session reuse. 068 * 069 * <p>Note also that MessageConsumers obtained from a cached Session won't get 070 * closed until the Session will eventually be removed from the pool. This may 071 * lead to semantic side effects in some cases. For a durable subscriber, the 072 * logical {@code Session.close()} call will also close the subscription. 073 * Re-registering a durable consumer for the same subscription on the same 074 * Session handle is not supported; close and reobtain a cached Session first. 075 * 076 * @author Juergen Hoeller 077 * @since 2.5.3 078 */ 079public class CachingConnectionFactory extends SingleConnectionFactory { 080 081 /** The JMS 2.0 Session.createSharedConsumer method, if available */ 082 private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable( 083 Session.class, "createSharedConsumer", Topic.class, String.class, String.class); 084 085 /** The JMS 2.0 Session.createSharedDurableConsumer method, if available */ 086 private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable( 087 Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class); 088 089 090 private int sessionCacheSize = 1; 091 092 private boolean cacheProducers = true; 093 094 private boolean cacheConsumers = true; 095 096 private volatile boolean active = true; 097 098 private final Map<Integer, LinkedList<Session>> cachedSessions = 099 new HashMap<Integer, LinkedList<Session>>(); 100 101 102 /** 103 * Create a new CachingConnectionFactory for bean-style usage. 104 * @see #setTargetConnectionFactory 105 */ 106 public CachingConnectionFactory() { 107 super(); 108 setReconnectOnException(true); 109 } 110 111 /** 112 * Create a new CachingConnectionFactory for the given target 113 * ConnectionFactory. 114 * @param targetConnectionFactory the target ConnectionFactory 115 */ 116 public CachingConnectionFactory(ConnectionFactory targetConnectionFactory) { 117 super(targetConnectionFactory); 118 setReconnectOnException(true); 119 } 120 121 122 /** 123 * Specify the desired size for the JMS Session cache (per JMS Session type). 124 * <p>This cache size is the maximum limit for the number of cached Sessions 125 * per session acknowledgement type (auto, client, dups_ok, transacted). 126 * As a consequence, the actual number of cached Sessions may be up to 127 * four times as high as the specified value - in the unlikely case 128 * of mixing and matching different acknowledgement types. 129 * <p>Default is 1: caching a single Session, (re-)creating further ones on 130 * demand. Specify a number like 10 if you'd like to raise the number of cached 131 * Sessions; that said, 1 may be sufficient for low-concurrency scenarios. 132 * @see #setCacheProducers 133 */ 134 public void setSessionCacheSize(int sessionCacheSize) { 135 Assert.isTrue(sessionCacheSize >= 1, "Session cache size must be 1 or higher"); 136 this.sessionCacheSize = sessionCacheSize; 137 } 138 139 /** 140 * Return the desired size for the JMS Session cache (per JMS Session type). 141 */ 142 public int getSessionCacheSize() { 143 return this.sessionCacheSize; 144 } 145 146 /** 147 * Specify whether to cache JMS MessageProducers per JMS Session instance 148 * (more specifically: one MessageProducer per Destination and Session). 149 * <p>Default is "true". Switch this to "false" in order to always 150 * recreate MessageProducers on demand. 151 */ 152 public void setCacheProducers(boolean cacheProducers) { 153 this.cacheProducers = cacheProducers; 154 } 155 156 /** 157 * Return whether to cache JMS MessageProducers per JMS Session instance. 158 */ 159 public boolean isCacheProducers() { 160 return this.cacheProducers; 161 } 162 163 /** 164 * Specify whether to cache JMS MessageConsumers per JMS Session instance 165 * (more specifically: one MessageConsumer per Destination, selector String 166 * and Session). Note that durable subscribers will only be cached until 167 * logical closing of the Session handle. 168 * <p>Default is "true". Switch this to "false" in order to always 169 * recreate MessageConsumers on demand. 170 */ 171 public void setCacheConsumers(boolean cacheConsumers) { 172 this.cacheConsumers = cacheConsumers; 173 } 174 175 /** 176 * Return whether to cache JMS MessageConsumers per JMS Session instance. 177 */ 178 public boolean isCacheConsumers() { 179 return this.cacheConsumers; 180 } 181 182 183 /** 184 * Resets the Session cache as well. 185 */ 186 @Override 187 public void resetConnection() { 188 this.active = false; 189 190 synchronized (this.cachedSessions) { 191 for (LinkedList<Session> sessionList : this.cachedSessions.values()) { 192 synchronized (sessionList) { 193 for (Session session : sessionList) { 194 try { 195 session.close(); 196 } 197 catch (Throwable ex) { 198 logger.trace("Could not close cached JMS Session", ex); 199 } 200 } 201 } 202 } 203 this.cachedSessions.clear(); 204 } 205 206 // Now proceed with actual closing of the shared Connection... 207 super.resetConnection(); 208 209 this.active = true; 210 } 211 212 /** 213 * Checks for a cached Session for the given mode. 214 */ 215 @Override 216 protected Session getSession(Connection con, Integer mode) throws JMSException { 217 if (!this.active) { 218 return null; 219 } 220 221 LinkedList<Session> sessionList; 222 synchronized (this.cachedSessions) { 223 sessionList = this.cachedSessions.get(mode); 224 if (sessionList == null) { 225 sessionList = new LinkedList<Session>(); 226 this.cachedSessions.put(mode, sessionList); 227 } 228 } 229 Session session = null; 230 synchronized (sessionList) { 231 if (!sessionList.isEmpty()) { 232 session = sessionList.removeFirst(); 233 } 234 } 235 if (session != null) { 236 if (logger.isTraceEnabled()) { 237 logger.trace("Found cached JMS Session for mode " + mode + ": " + 238 (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session)); 239 } 240 } 241 else { 242 Session targetSession = createSession(con, mode); 243 if (logger.isDebugEnabled()) { 244 logger.debug("Registering cached JMS Session for mode " + mode + ": " + targetSession); 245 } 246 session = getCachedSessionProxy(targetSession, sessionList); 247 } 248 return session; 249 } 250 251 /** 252 * Wrap the given Session with a proxy that delegates every method call to it 253 * but adapts close calls. This is useful for allowing application code to 254 * handle a special framework Session just like an ordinary Session. 255 * @param target the original Session to wrap 256 * @param sessionList the List of cached Sessions that the given Session belongs to 257 * @return the wrapped Session 258 */ 259 protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) { 260 List<Class<?>> classes = new ArrayList<Class<?>>(3); 261 classes.add(SessionProxy.class); 262 if (target instanceof QueueSession) { 263 classes.add(QueueSession.class); 264 } 265 if (target instanceof TopicSession) { 266 classes.add(TopicSession.class); 267 } 268 return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(), 269 ClassUtils.toClassArray(classes), new CachedSessionInvocationHandler(target, sessionList)); 270 } 271 272 273 /** 274 * Invocation handler for a cached JMS Session proxy. 275 */ 276 private class CachedSessionInvocationHandler implements InvocationHandler { 277 278 private final Session target; 279 280 private final LinkedList<Session> sessionList; 281 282 private final Map<DestinationCacheKey, MessageProducer> cachedProducers = 283 new HashMap<DestinationCacheKey, MessageProducer>(); 284 285 private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers = 286 new HashMap<ConsumerCacheKey, MessageConsumer>(); 287 288 private boolean transactionOpen = false; 289 290 public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) { 291 this.target = target; 292 this.sessionList = sessionList; 293 } 294 295 @Override 296 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 297 String methodName = method.getName(); 298 if (methodName.equals("equals")) { 299 // Only consider equal when proxies are identical. 300 return (proxy == args[0]); 301 } 302 else if (methodName.equals("hashCode")) { 303 // Use hashCode of Session proxy. 304 return System.identityHashCode(proxy); 305 } 306 else if (methodName.equals("toString")) { 307 return "Cached JMS Session: " + this.target; 308 } 309 else if (methodName.equals("close")) { 310 // Handle close method: don't pass the call on. 311 if (active) { 312 synchronized (this.sessionList) { 313 if (this.sessionList.size() < getSessionCacheSize()) { 314 try { 315 logicalClose((Session) proxy); 316 // Remain open in the session list. 317 return null; 318 } 319 catch (JMSException ex) { 320 logger.trace("Logical close of cached JMS Session failed - discarding it", ex); 321 // Proceed to physical close from here... 322 } 323 } 324 } 325 } 326 // If we get here, we're supposed to shut down. 327 physicalClose(); 328 return null; 329 } 330 else if (methodName.equals("getTargetSession")) { 331 // Handle getTargetSession method: return underlying Session. 332 return this.target; 333 } 334 else if (methodName.equals("commit") || methodName.equals("rollback")) { 335 this.transactionOpen = false; 336 } 337 else if (methodName.startsWith("create")) { 338 this.transactionOpen = true; 339 if (isCacheProducers() && (methodName.equals("createProducer") || 340 methodName.equals("createSender") || methodName.equals("createPublisher"))) { 341 // Destination argument being null is ok for a producer 342 Destination dest = (Destination) args[0]; 343 if (!(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) { 344 return getCachedProducer(dest); 345 } 346 } 347 else if (isCacheConsumers()) { 348 // let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null 349 if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") || 350 methodName.equals("createSubscriber"))) { 351 Destination dest = (Destination) args[0]; 352 if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) { 353 return getCachedConsumer(dest, 354 (args.length > 1 ? (String) args[1] : null), 355 (args.length > 2 && (Boolean) args[2]), 356 null, 357 false); 358 } 359 } 360 else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) { 361 Destination dest = (Destination) args[0]; 362 if (dest != null) { 363 return getCachedConsumer(dest, 364 (args.length > 2 ? (String) args[2] : null), 365 (args.length > 3 && (Boolean) args[3]), 366 (String) args[1], 367 true); 368 } 369 } 370 else if (methodName.equals("createSharedConsumer")) { 371 Destination dest = (Destination) args[0]; 372 if (dest != null) { 373 return getCachedConsumer(dest, 374 (args.length > 2 ? (String) args[2] : null), 375 null, 376 (String) args[1], 377 false); 378 } 379 } 380 else if (methodName.equals("createSharedDurableConsumer")) { 381 Destination dest = (Destination) args[0]; 382 if (dest != null) { 383 return getCachedConsumer(dest, 384 (args.length > 2 ? (String) args[2] : null), 385 null, 386 (String) args[1], 387 true); 388 } 389 } 390 } 391 } 392 try { 393 return method.invoke(this.target, args); 394 } 395 catch (InvocationTargetException ex) { 396 throw ex.getTargetException(); 397 } 398 } 399 400 private MessageProducer getCachedProducer(Destination dest) throws JMSException { 401 DestinationCacheKey cacheKey = (dest != null ? new DestinationCacheKey(dest) : null); 402 MessageProducer producer = this.cachedProducers.get(cacheKey); 403 if (producer != null) { 404 if (logger.isTraceEnabled()) { 405 logger.trace("Found cached JMS MessageProducer for destination [" + dest + "]: " + producer); 406 } 407 } 408 else { 409 producer = this.target.createProducer(dest); 410 if (logger.isDebugEnabled()) { 411 logger.debug("Registering cached JMS MessageProducer for destination [" + dest + "]: " + producer); 412 } 413 this.cachedProducers.put(cacheKey, producer); 414 } 415 return new CachedMessageProducer(producer).getProxyIfNecessary(); 416 } 417 418 private MessageConsumer getCachedConsumer( 419 Destination dest, String selector, Boolean noLocal, String subscription, boolean durable) throws JMSException { 420 421 ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription, durable); 422 MessageConsumer consumer = this.cachedConsumers.get(cacheKey); 423 if (consumer != null) { 424 if (logger.isTraceEnabled()) { 425 logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer); 426 } 427 } 428 else { 429 if (dest instanceof Topic) { 430 if (noLocal == null) { 431 // createSharedConsumer((Topic) dest, subscription, selector); 432 // createSharedDurableConsumer((Topic) dest, subscription, selector); 433 Method method = (durable ? createSharedDurableConsumerMethod : createSharedConsumerMethod); 434 try { 435 consumer = (MessageConsumer) method.invoke(this.target, dest, subscription, selector); 436 } 437 catch (InvocationTargetException ex) { 438 if (ex.getTargetException() instanceof JMSException) { 439 throw (JMSException) ex.getTargetException(); 440 } 441 ReflectionUtils.handleInvocationTargetException(ex); 442 } 443 catch (IllegalAccessException ex) { 444 throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage()); 445 } 446 } 447 else { 448 consumer = (durable ? 449 this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : 450 this.target.createConsumer(dest, selector, noLocal)); 451 } 452 } 453 else { 454 consumer = this.target.createConsumer(dest, selector); 455 } 456 if (logger.isDebugEnabled()) { 457 logger.debug("Registering cached JMS MessageConsumer for destination [" + dest + "]: " + consumer); 458 } 459 this.cachedConsumers.put(cacheKey, consumer); 460 } 461 return new CachedMessageConsumer(consumer); 462 } 463 464 private void logicalClose(Session proxy) throws JMSException { 465 // Preserve rollback-on-close semantics. 466 if (this.transactionOpen && this.target.getTransacted()) { 467 this.transactionOpen = false; 468 this.target.rollback(); 469 } 470 // Physically close durable subscribers at time of Session close call. 471 for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) { 472 Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next(); 473 if (entry.getKey().subscription != null) { 474 entry.getValue().close(); 475 it.remove(); 476 } 477 } 478 // Allow for multiple close calls... 479 boolean returned = false; 480 synchronized (this.sessionList) { 481 if (!this.sessionList.contains(proxy)) { 482 this.sessionList.addLast(proxy); 483 returned = true; 484 } 485 } 486 if (returned && logger.isTraceEnabled()) { 487 logger.trace("Returned cached Session: " + this.target); 488 } 489 } 490 491 private void physicalClose() throws JMSException { 492 if (logger.isDebugEnabled()) { 493 logger.debug("Closing cached Session: " + this.target); 494 } 495 // Explicitly close all MessageProducers and MessageConsumers that 496 // this Session happens to cache... 497 try { 498 for (MessageProducer producer : this.cachedProducers.values()) { 499 producer.close(); 500 } 501 for (MessageConsumer consumer : this.cachedConsumers.values()) { 502 consumer.close(); 503 } 504 } 505 finally { 506 this.cachedProducers.clear(); 507 this.cachedConsumers.clear(); 508 // Now actually close the Session. 509 this.target.close(); 510 } 511 } 512 } 513 514 515 /** 516 * Simple wrapper class around a Destination reference. 517 * Used as the cache key when caching MessageProducer objects. 518 */ 519 private static class DestinationCacheKey implements Comparable<DestinationCacheKey> { 520 521 private final Destination destination; 522 523 private String destinationString; 524 525 public DestinationCacheKey(Destination destination) { 526 Assert.notNull(destination, "Destination must not be null"); 527 this.destination = destination; 528 } 529 530 private String getDestinationString() { 531 if (this.destinationString == null) { 532 this.destinationString = this.destination.toString(); 533 } 534 return this.destinationString; 535 } 536 537 protected boolean destinationEquals(DestinationCacheKey otherKey) { 538 return (this.destination.getClass() == otherKey.destination.getClass() && 539 (this.destination.equals(otherKey.destination) || 540 getDestinationString().equals(otherKey.getDestinationString()))); 541 } 542 543 @Override 544 public boolean equals(Object other) { 545 // Effectively checking object equality as well as toString equality. 546 // On WebSphere MQ, Destination objects do not implement equals... 547 return (this == other || destinationEquals((DestinationCacheKey) other)); 548 } 549 550 @Override 551 public int hashCode() { 552 // Can't use a more specific hashCode since we can't rely on 553 // this.destination.hashCode() actually being the same value 554 // for equivalent destinations... Thanks a lot, WebSphere MQ! 555 return this.destination.getClass().hashCode(); 556 } 557 558 @Override 559 public String toString() { 560 return getDestinationString(); 561 } 562 563 @Override 564 public int compareTo(DestinationCacheKey other) { 565 return getDestinationString().compareTo(other.getDestinationString()); 566 } 567 } 568 569 570 /** 571 * Simple wrapper class around a Destination and other consumer attributes. 572 * Used as the cache key when caching MessageConsumer objects. 573 */ 574 private static class ConsumerCacheKey extends DestinationCacheKey { 575 576 private final String selector; 577 578 private final Boolean noLocal; 579 580 private final String subscription; 581 582 private final boolean durable; 583 584 public ConsumerCacheKey(Destination destination, String selector, Boolean noLocal, String subscription, boolean durable) { 585 super(destination); 586 this.selector = selector; 587 this.noLocal = noLocal; 588 this.subscription = subscription; 589 this.durable = durable; 590 } 591 592 @Override 593 public boolean equals(Object other) { 594 if (this == other) { 595 return true; 596 } 597 ConsumerCacheKey otherKey = (ConsumerCacheKey) other; 598 return (destinationEquals(otherKey) && 599 ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) && 600 ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) && 601 ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription) && 602 this.durable == otherKey.durable); 603 } 604 605 @Override 606 public String toString() { 607 return super.toString() + " [selector=" + this.selector + ", noLocal=" + this.noLocal + 608 ", subscription=" + this.subscription + ", durable=" + this.durable + "]"; 609 } 610 } 611 612}