001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.connection; 018 019import java.lang.reflect.InvocationHandler; 020import java.lang.reflect.InvocationTargetException; 021import java.lang.reflect.Method; 022import java.lang.reflect.Proxy; 023import java.util.ArrayList; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.concurrent.ConcurrentHashMap; 030import java.util.concurrent.ConcurrentMap; 031 032import javax.jms.Connection; 033import javax.jms.ConnectionFactory; 034import javax.jms.Destination; 035import javax.jms.JMSException; 036import javax.jms.MessageConsumer; 037import javax.jms.MessageProducer; 038import javax.jms.QueueSession; 039import javax.jms.Session; 040import javax.jms.TemporaryQueue; 041import javax.jms.TemporaryTopic; 042import javax.jms.Topic; 043import javax.jms.TopicSession; 044 045import org.springframework.lang.Nullable; 046import org.springframework.util.Assert; 047import org.springframework.util.ClassUtils; 048import org.springframework.util.ObjectUtils; 049 050/** 051 * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session} 052 * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory 053 * also switches the {@link #setReconnectOnException "reconnectOnException" property} 054 * to "true" by default, allowing for automatic recovery of the underlying Connection. 055 * 056 * <p>By default, only one single Session will be cached, with further requested 057 * Sessions being created and disposed on demand. Consider raising the 058 * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a 059 * high-concurrency environment. 060 * 061 * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch 062 * into queue/topic mode according to the JMS API methods used at runtime: 063 * {@code createQueueConnection} and {@code createTopicConnection} will 064 * lead to queue/topic mode, respectively; generic {@code createConnection} 065 * calls will lead to a JMS 1.1 connection which is able to serve both modes. 066 * 067 * <p>As of Spring Framework 5, this class supports JMS 2.0 {@code JMSContext} 068 * calls and therefore requires the JMS 2.0 API to be present at runtime. 069 * It may nevertheless run against a JMS 1.1 driver (bound to the JMS 2.0 API) 070 * as long as no actual JMS 2.0 calls are triggered by the application's setup. 071 * 072 * <p><b>NOTE: This ConnectionFactory requires explicit closing of all Sessions 073 * obtained from its shared Connection.</b> This is the usual recommendation for 074 * native JMS access code anyway. However, with this ConnectionFactory, its use 075 * is mandatory in order to actually allow for Session reuse. 076 * 077 * <p>Note also that MessageConsumers obtained from a cached Session won't get 078 * closed until the Session will eventually be removed from the pool. This may 079 * lead to semantic side effects in some cases. For a durable subscriber, the 080 * logical {@code Session.close()} call will also close the subscription. 081 * Re-registering a durable consumer for the same subscription on the same 082 * Session handle is not supported; close and reobtain a cached Session first. 083 * 084 * @author Juergen Hoeller 085 * @since 2.5.3 086 */ 087public class CachingConnectionFactory extends SingleConnectionFactory { 088 089 private int sessionCacheSize = 1; 090 091 private boolean cacheProducers = true; 092 093 private boolean cacheConsumers = true; 094 095 private volatile boolean active = true; 096 097 private final ConcurrentMap<Integer, LinkedList<Session>> cachedSessions = new ConcurrentHashMap<>(); 098 099 100 /** 101 * Create a new CachingConnectionFactory for bean-style usage. 102 * @see #setTargetConnectionFactory 103 */ 104 public CachingConnectionFactory() { 105 super(); 106 setReconnectOnException(true); 107 } 108 109 /** 110 * Create a new CachingConnectionFactory for the given target 111 * ConnectionFactory. 112 * @param targetConnectionFactory the target ConnectionFactory 113 */ 114 public CachingConnectionFactory(ConnectionFactory targetConnectionFactory) { 115 super(targetConnectionFactory); 116 setReconnectOnException(true); 117 } 118 119 120 /** 121 * Specify the desired size for the JMS Session cache (per JMS Session type). 122 * <p>This cache size is the maximum limit for the number of cached Sessions 123 * per session acknowledgement type (auto, client, dups_ok, transacted). 124 * As a consequence, the actual number of cached Sessions may be up to 125 * four times as high as the specified value - in the unlikely case 126 * of mixing and matching different acknowledgement types. 127 * <p>Default is 1: caching a single Session, (re-)creating further ones on 128 * demand. Specify a number like 10 if you'd like to raise the number of cached 129 * Sessions; that said, 1 may be sufficient for low-concurrency scenarios. 130 * @see #setCacheProducers 131 */ 132 public void setSessionCacheSize(int sessionCacheSize) { 133 Assert.isTrue(sessionCacheSize >= 1, "Session cache size must be 1 or higher"); 134 this.sessionCacheSize = sessionCacheSize; 135 } 136 137 /** 138 * Return the desired size for the JMS Session cache (per JMS Session type). 139 */ 140 public int getSessionCacheSize() { 141 return this.sessionCacheSize; 142 } 143 144 /** 145 * Specify whether to cache JMS MessageProducers per JMS Session instance 146 * (more specifically: one MessageProducer per Destination and Session). 147 * <p>Default is "true". Switch this to "false" in order to always 148 * recreate MessageProducers on demand. 149 */ 150 public void setCacheProducers(boolean cacheProducers) { 151 this.cacheProducers = cacheProducers; 152 } 153 154 /** 155 * Return whether to cache JMS MessageProducers per JMS Session instance. 156 */ 157 public boolean isCacheProducers() { 158 return this.cacheProducers; 159 } 160 161 /** 162 * Specify whether to cache JMS MessageConsumers per JMS Session instance 163 * (more specifically: one MessageConsumer per Destination, selector String 164 * and Session). Note that durable subscribers will only be cached until 165 * logical closing of the Session handle. 166 * <p>Default is "true". Switch this to "false" in order to always 167 * recreate MessageConsumers on demand. 168 */ 169 public void setCacheConsumers(boolean cacheConsumers) { 170 this.cacheConsumers = cacheConsumers; 171 } 172 173 /** 174 * Return whether to cache JMS MessageConsumers per JMS Session instance. 175 */ 176 public boolean isCacheConsumers() { 177 return this.cacheConsumers; 178 } 179 180 181 /** 182 * Resets the Session cache as well. 183 */ 184 @Override 185 public void resetConnection() { 186 this.active = false; 187 188 synchronized (this.cachedSessions) { 189 for (LinkedList<Session> sessionList : this.cachedSessions.values()) { 190 synchronized (sessionList) { 191 for (Session session : sessionList) { 192 try { 193 session.close(); 194 } 195 catch (Throwable ex) { 196 logger.trace("Could not close cached JMS Session", ex); 197 } 198 } 199 } 200 } 201 this.cachedSessions.clear(); 202 } 203 204 // Now proceed with actual closing of the shared Connection... 205 super.resetConnection(); 206 207 this.active = true; 208 } 209 210 /** 211 * Checks for a cached Session for the given mode. 212 */ 213 @Override 214 protected Session getSession(Connection con, Integer mode) throws JMSException { 215 if (!this.active) { 216 return null; 217 } 218 219 LinkedList<Session> sessionList = this.cachedSessions.computeIfAbsent(mode, k -> new LinkedList<>()); 220 Session session = null; 221 synchronized (sessionList) { 222 if (!sessionList.isEmpty()) { 223 session = sessionList.removeFirst(); 224 } 225 } 226 if (session != null) { 227 if (logger.isTraceEnabled()) { 228 logger.trace("Found cached JMS Session for mode " + mode + ": " + 229 (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session)); 230 } 231 } 232 else { 233 Session targetSession = createSession(con, mode); 234 if (logger.isDebugEnabled()) { 235 logger.debug("Registering cached JMS Session for mode " + mode + ": " + targetSession); 236 } 237 session = getCachedSessionProxy(targetSession, sessionList); 238 } 239 return session; 240 } 241 242 /** 243 * Wrap the given Session with a proxy that delegates every method call to it 244 * but adapts close calls. This is useful for allowing application code to 245 * handle a special framework Session just like an ordinary Session. 246 * @param target the original Session to wrap 247 * @param sessionList the List of cached Sessions that the given Session belongs to 248 * @return the wrapped Session 249 */ 250 protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) { 251 List<Class<?>> classes = new ArrayList<>(3); 252 classes.add(SessionProxy.class); 253 if (target instanceof QueueSession) { 254 classes.add(QueueSession.class); 255 } 256 if (target instanceof TopicSession) { 257 classes.add(TopicSession.class); 258 } 259 return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(), 260 ClassUtils.toClassArray(classes), new CachedSessionInvocationHandler(target, sessionList)); 261 } 262 263 264 /** 265 * Invocation handler for a cached JMS Session proxy. 266 */ 267 private class CachedSessionInvocationHandler implements InvocationHandler { 268 269 private final Session target; 270 271 private final LinkedList<Session> sessionList; 272 273 private final Map<DestinationCacheKey, MessageProducer> cachedProducers = new HashMap<>(); 274 275 private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers = new HashMap<>(); 276 277 private boolean transactionOpen = false; 278 279 public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) { 280 this.target = target; 281 this.sessionList = sessionList; 282 } 283 284 @Override 285 @Nullable 286 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 287 String methodName = method.getName(); 288 if (methodName.equals("equals")) { 289 // Only consider equal when proxies are identical. 290 return (proxy == args[0]); 291 } 292 else if (methodName.equals("hashCode")) { 293 // Use hashCode of Session proxy. 294 return System.identityHashCode(proxy); 295 } 296 else if (methodName.equals("toString")) { 297 return "Cached JMS Session: " + this.target; 298 } 299 else if (methodName.equals("close")) { 300 // Handle close method: don't pass the call on. 301 if (active) { 302 synchronized (this.sessionList) { 303 if (this.sessionList.size() < getSessionCacheSize()) { 304 try { 305 logicalClose((Session) proxy); 306 // Remain open in the session list. 307 return null; 308 309 catch (JMSException ex) { 310 logger.trace("Logical close of cached JMS Session failed - discarding it", ex); 311 // Proceed to physical close from here... 312 } 313 } 314 } 315 } 316 // If we get here, we're supposed to shut down. 317 physicalClose(); 318 return null; 319 } 320 else if (methodName.equals("getTargetSession")) { 321 // Handle getTargetSession method: return underlying Session. 322 return this.target; 323 } 324 else if (methodName.equals("commit") || methodName.equals("rollback")) { 325 this.transactionOpen = false; 326 } 327 else if (methodName.startsWith("create")) { 328 this.transactionOpen = true; 329 if (isCacheProducers() && (methodName.equals("createProducer") || 330 methodName.equals("createSender") || methodName.equals("createPublisher"))) { 331 // Destination argument being null is ok for a producer 332 Destination dest = (Destination) args[0]; 333 if (!(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) { 334 return getCachedProducer(dest); 335 } 336 } 337 else if (isCacheConsumers()) { 338 // let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null 339 if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") || 340 methodName.equals("createSubscriber"))) { 341 Destination dest = (Destination) args[0]; 342 if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) { 343 return getCachedConsumer(dest, 344 (args.length > 1 ? (String) args[1] : null), 345 (args.length > 2 && (Boolean) args[2]), 346 null, 347 false); 348 } 349 } 350 else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) { 351 Destination dest = (Destination) args[0]; 352 if (dest != null) { 353 return getCachedConsumer(dest, 354 (args.length > 2 ? (String) args[2] : null), 355 (args.length > 3 && (Boolean) args[3]), 356 (String) args[1], 357 true); 358 } 359 } 360 else if (methodName.equals("createSharedConsumer")) { 361 Destination dest = (Destination) args[0]; 362 if (dest != null) { 363 return getCachedConsumer(dest, 364 (args.length > 2 ? (String) args[2] : null), 365 null, 366 (String) args[1], 367 false); 368 } 369 } 370 else if (methodName.equals("createSharedDurableConsumer")) { 371 Destination dest = (Destination) args[0]; 372 if (dest != null) { 373 return getCachedConsumer(dest, 374 (args.length > 2 ? (String) args[2] : null), 375 null, 376 (String) args[1], 377 true); 378 } 379 } 380 } 381 } 382 try { 383 return method.invoke(this.target, args); 384 } 385 catch (InvocationTargetException ex) { 386 throw ex.getTargetException(); 387 } 388 } 389 390 private MessageProducer getCachedProducer(@Nullable Destination dest) throws JMSException { 391 DestinationCacheKey cacheKey = (dest != null ? new DestinationCacheKey(dest) : null); 392 MessageProducer producer = this.cachedProducers.get(cacheKey); 393 if (producer != null) { 394 if (logger.isTraceEnabled()) { 395 logger.trace("Found cached JMS MessageProducer for destination [" + dest + "]: " + producer); 396 } 397 } 398 else { 399 producer = this.target.createProducer(dest); 400 if (logger.isDebugEnabled()) { 401 logger.debug("Registering cached JMS MessageProducer for destination [" + dest + "]: " + producer); 402 } 403 this.cachedProducers.put(cacheKey, producer); 404 } 405 return new CachedMessageProducer(producer); 406 } 407 408 private MessageConsumer getCachedConsumer(Destination dest, @Nullable String selector, 409 @Nullable Boolean noLocal, @Nullable String subscription, boolean durable) throws JMSException { 410 411 ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription, durable); 412 MessageConsumer consumer = this.cachedConsumers.get(cacheKey); 413 if (consumer != null) { 414 if (logger.isTraceEnabled()) { 415 logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer); 416 } 417 } 418 else { 419 if (dest instanceof Topic) { 420 if (noLocal == null) { 421 consumer = (durable ? 422 this.target.createSharedDurableConsumer((Topic) dest, subscription, selector) : 423 this.target.createSharedConsumer((Topic) dest, subscription, selector)); 424 } 425 else { 426 consumer = (durable ? 427 this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) : 428 this.target.createConsumer(dest, selector, noLocal)); 429 } 430 } 431 else { 432 consumer = this.target.createConsumer(dest, selector); 433 } 434 if (logger.isDebugEnabled()) { 435 logger.debug("Registering cached JMS MessageConsumer for destination [" + dest + "]: " + consumer); 436 } 437 this.cachedConsumers.put(cacheKey, consumer); 438 } 439 return new CachedMessageConsumer(consumer); 440 } 441 442 private void logicalClose(Session proxy) throws JMSException { 443 // Preserve rollback-on-close semantics. 444 if (this.transactionOpen && this.target.getTransacted()) { 445 this.transactionOpen = false; 446 this.target.rollback(); 447 } 448 // Physically close durable subscribers at time of Session close call. 449 for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) { 450 Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next(); 451 if (entry.getKey().subscription != null) { 452 entry.getValue().close(); 453 it.remove(); 454 } 455 } 456 // Allow for multiple close calls... 457 boolean returned = false; 458 synchronized (this.sessionList) { 459 if (!this.sessionList.contains(proxy)) { 460 this.sessionList.addLast(proxy); 461 returned = true; 462 } 463 } 464 if (returned && logger.isTraceEnabled()) { 465 logger.trace("Returned cached Session: " + this.target); 466 } 467 } 468 469 private void physicalClose() throws JMSException { 470 if (logger.isDebugEnabled()) { 471 logger.debug("Closing cached Session: " + this.target); 472 } 473 // Explicitly close all MessageProducers and MessageConsumers that 474 // this Session happens to cache... 475 try { 476 for (MessageProducer producer : this.cachedProducers.values()) { 477 producer.close(); 478 } 479 for (MessageConsumer consumer : this.cachedConsumers.values()) { 480 consumer.close(); 481 } 482 } 483 finally { 484 this.cachedProducers.clear(); 485 this.cachedConsumers.clear(); 486 // Now actually close the Session. 487 this.target.close(); 488 } 489 } 490 } 491 492 493 /** 494 * Simple wrapper class around a Destination reference. 495 * Used as the cache key when caching MessageProducer objects. 496 */ 497 private static class DestinationCacheKey implements Comparable<DestinationCacheKey> { 498 499 private final Destination destination; 500 501 @Nullable 502 private String destinationString; 503 504 public DestinationCacheKey(Destination destination) { 505 Assert.notNull(destination, "Destination must not be null"); 506 this.destination = destination; 507 } 508 509 private String getDestinationString() { 510 if (this.destinationString == null) { 511 this.destinationString = this.destination.toString(); 512 } 513 return this.destinationString; 514 } 515 516 protected boolean destinationEquals(DestinationCacheKey otherKey) { 517 return (this.destination.getClass() == otherKey.destination.getClass() && 518 (this.destination.equals(otherKey.destination) || 519 getDestinationString().equals(otherKey.getDestinationString()))); 520 } 521 522 @Override 523 public boolean equals(@Nullable Object other) { 524 // Effectively checking object equality as well as toString equality. 525 // On WebSphere MQ, Destination objects do not implement equals... 526 return (this == other || (other instanceof DestinationCacheKey && 527 destinationEquals((DestinationCacheKey) other))); 528 } 529 530 @Override 531 public int hashCode() { 532 // Can't use a more specific hashCode since we can't rely on 533 // this.destination.hashCode() actually being the same value 534 // for equivalent destinations... Thanks a lot, WebSphere MQ! 535 return this.destination.getClass().hashCode(); 536 } 537 538 @Override 539 public String toString() { 540 return getDestinationString(); 541 } 542 543 @Override 544 public int compareTo(DestinationCacheKey other) { 545 return getDestinationString().compareTo(other.getDestinationString()); 546 } 547 } 548 549 550 /** 551 * Simple wrapper class around a Destination and other consumer attributes. 552 * Used as the cache key when caching MessageConsumer objects. 553 */ 554 private static class ConsumerCacheKey extends DestinationCacheKey { 555 556 @Nullable 557 private final String selector; 558 559 @Nullable 560 private final Boolean noLocal; 561 562 @Nullable 563 private final String subscription; 564 565 private final boolean durable; 566 567 public ConsumerCacheKey(Destination destination, @Nullable String selector, @Nullable Boolean noLocal, 568 @Nullable String subscription, boolean durable) { 569 570 super(destination); 571 this.selector = selector; 572 this.noLocal = noLocal; 573 this.subscription = subscription; 574 this.durable = durable; 575 } 576 577 @Override 578 public boolean equals(@Nullable Object other) { 579 if (this == other) { 580 return true; 581 } 582 if (!(other instanceof ConsumerCacheKey)) { 583 return false; 584 } 585 ConsumerCacheKey otherKey = (ConsumerCacheKey) other; 586 return (destinationEquals(otherKey) && 587 ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) && 588 ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) && 589 ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription) && 590 this.durable == otherKey.durable); 591 } 592 593 @Override 594 public int hashCode() { 595 return (31 * super.hashCode() + ObjectUtils.nullSafeHashCode(this.selector)); 596 } 597 598 @Override 599 public String toString() { 600 return super.toString() + " [selector=" + this.selector + ", noLocal=" + this.noLocal + 601 ", subscription=" + this.subscription + ", durable=" + this.durable + "]"; 602 } 603 } 604 605}