001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import java.lang.reflect.InvocationHandler;
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.lang.reflect.Proxy;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentMap;
031
032import javax.jms.Connection;
033import javax.jms.ConnectionFactory;
034import javax.jms.Destination;
035import javax.jms.JMSException;
036import javax.jms.MessageConsumer;
037import javax.jms.MessageProducer;
038import javax.jms.QueueSession;
039import javax.jms.Session;
040import javax.jms.TemporaryQueue;
041import javax.jms.TemporaryTopic;
042import javax.jms.Topic;
043import javax.jms.TopicSession;
044
045import org.springframework.lang.Nullable;
046import org.springframework.util.Assert;
047import org.springframework.util.ClassUtils;
048import org.springframework.util.ObjectUtils;
049
050/**
051 * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session}
052 * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory
053 * also switches the {@link #setReconnectOnException "reconnectOnException" property}
054 * to "true" by default, allowing for automatic recovery of the underlying Connection.
055 *
056 * <p>By default, only one single Session will be cached, with further requested
057 * Sessions being created and disposed on demand. Consider raising the
058 * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a
059 * high-concurrency environment.
060 *
061 * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch
062 * into queue/topic mode according to the JMS API methods used at runtime:
063 * {@code createQueueConnection} and {@code createTopicConnection} will
064 * lead to queue/topic mode, respectively; generic {@code createConnection}
065 * calls will lead to a JMS 1.1 connection which is able to serve both modes.
066 *
067 * <p>As of Spring Framework 5, this class supports JMS 2.0 {@code JMSContext}
068 * calls and therefore requires the JMS 2.0 API to be present at runtime.
069 * It may nevertheless run against a JMS 1.1 driver (bound to the JMS 2.0 API)
070 * as long as no actual JMS 2.0 calls are triggered by the application's setup.
071 *
072 * <p><b>NOTE: This ConnectionFactory requires explicit closing of all Sessions
073 * obtained from its shared Connection.</b> This is the usual recommendation for
074 * native JMS access code anyway. However, with this ConnectionFactory, its use
075 * is mandatory in order to actually allow for Session reuse.
076 *
077 * <p>Note also that MessageConsumers obtained from a cached Session won't get
078 * closed until the Session will eventually be removed from the pool. This may
079 * lead to semantic side effects in some cases. For a durable subscriber, the
080 * logical {@code Session.close()} call will also close the subscription.
081 * Re-registering a durable consumer for the same subscription on the same
082 * Session handle is not supported; close and reobtain a cached Session first.
083 *
084 * @author Juergen Hoeller
085 * @since 2.5.3
086 */
087public class CachingConnectionFactory extends SingleConnectionFactory {
088
089        private int sessionCacheSize = 1;
090
091        private boolean cacheProducers = true;
092
093        private boolean cacheConsumers = true;
094
095        private volatile boolean active = true;
096
097        private final ConcurrentMap<Integer, LinkedList<Session>> cachedSessions = new ConcurrentHashMap<>();
098
099
100        /**
101         * Create a new CachingConnectionFactory for bean-style usage.
102         * @see #setTargetConnectionFactory
103         */
104        public CachingConnectionFactory() {
105                super();
106                setReconnectOnException(true);
107        }
108
109        /**
110         * Create a new CachingConnectionFactory for the given target
111         * ConnectionFactory.
112         * @param targetConnectionFactory the target ConnectionFactory
113         */
114        public CachingConnectionFactory(ConnectionFactory targetConnectionFactory) {
115                super(targetConnectionFactory);
116                setReconnectOnException(true);
117        }
118
119
120        /**
121         * Specify the desired size for the JMS Session cache (per JMS Session type).
122         * <p>This cache size is the maximum limit for the number of cached Sessions
123         * per session acknowledgement type (auto, client, dups_ok, transacted).
124         * As a consequence, the actual number of cached Sessions may be up to
125         * four times as high as the specified value - in the unlikely case
126         * of mixing and matching different acknowledgement types.
127         * <p>Default is 1: caching a single Session, (re-)creating further ones on
128         * demand. Specify a number like 10 if you'd like to raise the number of cached
129         * Sessions; that said, 1 may be sufficient for low-concurrency scenarios.
130         * @see #setCacheProducers
131         */
132        public void setSessionCacheSize(int sessionCacheSize) {
133                Assert.isTrue(sessionCacheSize >= 1, "Session cache size must be 1 or higher");
134                this.sessionCacheSize = sessionCacheSize;
135        }
136
137        /**
138         * Return the desired size for the JMS Session cache (per JMS Session type).
139         */
140        public int getSessionCacheSize() {
141                return this.sessionCacheSize;
142        }
143
144        /**
145         * Specify whether to cache JMS MessageProducers per JMS Session instance
146         * (more specifically: one MessageProducer per Destination and Session).
147         * <p>Default is "true". Switch this to "false" in order to always
148         * recreate MessageProducers on demand.
149         */
150        public void setCacheProducers(boolean cacheProducers) {
151                this.cacheProducers = cacheProducers;
152        }
153
154        /**
155         * Return whether to cache JMS MessageProducers per JMS Session instance.
156         */
157        public boolean isCacheProducers() {
158                return this.cacheProducers;
159        }
160
161        /**
162         * Specify whether to cache JMS MessageConsumers per JMS Session instance
163         * (more specifically: one MessageConsumer per Destination, selector String
164         * and Session). Note that durable subscribers will only be cached until
165         * logical closing of the Session handle.
166         * <p>Default is "true". Switch this to "false" in order to always
167         * recreate MessageConsumers on demand.
168         */
169        public void setCacheConsumers(boolean cacheConsumers) {
170                this.cacheConsumers = cacheConsumers;
171        }
172
173        /**
174         * Return whether to cache JMS MessageConsumers per JMS Session instance.
175         */
176        public boolean isCacheConsumers() {
177                return this.cacheConsumers;
178        }
179
180
181        /**
182         * Resets the Session cache as well.
183         */
184        @Override
185        public void resetConnection() {
186                this.active = false;
187
188                synchronized (this.cachedSessions) {
189                        for (LinkedList<Session> sessionList : this.cachedSessions.values()) {
190                                synchronized (sessionList) {
191                                        for (Session session : sessionList) {
192                                                try {
193                                                        session.close();
194                                                }
195                                                catch (Throwable ex) {
196                                                        logger.trace("Could not close cached JMS Session", ex);
197                                                }
198                                        }
199                                }
200                        }
201                        this.cachedSessions.clear();
202                }
203
204                // Now proceed with actual closing of the shared Connection...
205                super.resetConnection();
206
207                this.active = true;
208        }
209
210        /**
211         * Checks for a cached Session for the given mode.
212         */
213        @Override
214        protected Session getSession(Connection con, Integer mode) throws JMSException {
215                if (!this.active) {
216                        return null;
217                }
218
219                LinkedList<Session> sessionList = this.cachedSessions.computeIfAbsent(mode, k -> new LinkedList<>());
220                Session session = null;
221                synchronized (sessionList) {
222                        if (!sessionList.isEmpty()) {
223                                session = sessionList.removeFirst();
224                        }
225                }
226                if (session != null) {
227                        if (logger.isTraceEnabled()) {
228                                logger.trace("Found cached JMS Session for mode " + mode + ": " +
229                                                (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session));
230                        }
231                }
232                else {
233                        Session targetSession = createSession(con, mode);
234                        if (logger.isDebugEnabled()) {
235                                logger.debug("Registering cached JMS Session for mode " + mode + ": " + targetSession);
236                        }
237                        session = getCachedSessionProxy(targetSession, sessionList);
238                }
239                return session;
240        }
241
242        /**
243         * Wrap the given Session with a proxy that delegates every method call to it
244         * but adapts close calls. This is useful for allowing application code to
245         * handle a special framework Session just like an ordinary Session.
246         * @param target the original Session to wrap
247         * @param sessionList the List of cached Sessions that the given Session belongs to
248         * @return the wrapped Session
249         */
250        protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
251                List<Class<?>> classes = new ArrayList<>(3);
252                classes.add(SessionProxy.class);
253                if (target instanceof QueueSession) {
254                        classes.add(QueueSession.class);
255                }
256                if (target instanceof TopicSession) {
257                        classes.add(TopicSession.class);
258                }
259                return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(),
260                                ClassUtils.toClassArray(classes), new CachedSessionInvocationHandler(target, sessionList));
261        }
262
263
264        /**
265         * Invocation handler for a cached JMS Session proxy.
266         */
267        private class CachedSessionInvocationHandler implements InvocationHandler {
268
269                private final Session target;
270
271                private final LinkedList<Session> sessionList;
272
273                private final Map<DestinationCacheKey, MessageProducer> cachedProducers = new HashMap<>();
274
275                private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers = new HashMap<>();
276
277                private boolean transactionOpen = false;
278
279                public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) {
280                        this.target = target;
281                        this.sessionList = sessionList;
282                }
283
284                @Override
285                @Nullable
286                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
287                        String methodName = method.getName();
288                        if (methodName.equals("equals")) {
289                                // Only consider equal when proxies are identical.
290                                return (proxy == args[0]);
291                        }
292                        else if (methodName.equals("hashCode")) {
293                                // Use hashCode of Session proxy.
294                                return System.identityHashCode(proxy);
295                        }
296                        else if (methodName.equals("toString")) {
297                                return "Cached JMS Session: " + this.target;
298                        }
299                        else if (methodName.equals("close")) {
300                                // Handle close method: don't pass the call on.
301                                if (active) {
302                                        synchronized (this.sessionList) {
303                                                if (this.sessionList.size() < getSessionCacheSize()) {
304                                                        try {
305                                                                logicalClose((Session) proxy);
306                                                                // Remain open in the session list.
307                                                                return null;
308                                  
309                                                        catch (JMSException ex) {
310                                                                logger.trace("Logical close of cached JMS Session failed - discarding it", ex);
311                                                                // Proceed to physical close from here...
312                                                        }
313                                                }
314                                        }
315                                }
316                                // If we get here, we're supposed to shut down.
317                                physicalClose();
318                                return null;
319                        }
320                        else if (methodName.equals("getTargetSession")) {
321                                // Handle getTargetSession method: return underlying Session.
322                                return this.target;
323                        }
324                        else if (methodName.equals("commit") || methodName.equals("rollback")) {
325                                this.transactionOpen = false;
326                        }
327                        else if (methodName.startsWith("create")) {
328                                this.transactionOpen = true;
329                                if (isCacheProducers() && (methodName.equals("createProducer") ||
330                                                methodName.equals("createSender") || methodName.equals("createPublisher"))) {
331                                        // Destination argument being null is ok for a producer
332                                        Destination dest = (Destination) args[0];
333                                        if (!(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
334                                                return getCachedProducer(dest);
335                                        }
336                                }
337                                else if (isCacheConsumers()) {
338                                        // let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null
339                                        if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") ||
340                                                        methodName.equals("createSubscriber"))) {
341                                                Destination dest = (Destination) args[0];
342                                                if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
343                                                        return getCachedConsumer(dest,
344                                                                        (args.length > 1 ? (String) args[1] : null),
345                                                                        (args.length > 2 && (Boolean) args[2]),
346                                                                        null,
347                                                                        false);
348                                                }
349                                        }
350                                        else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) {
351                                                Destination dest = (Destination) args[0];
352                                                if (dest != null) {
353                                                        return getCachedConsumer(dest,
354                                                                        (args.length > 2 ? (String) args[2] : null),
355                                                                        (args.length > 3 && (Boolean) args[3]),
356                                                                        (String) args[1],
357                                                                        true);
358                                                }
359                                        }
360                                        else if (methodName.equals("createSharedConsumer")) {
361                                                Destination dest = (Destination) args[0];
362                                                if (dest != null) {
363                                                        return getCachedConsumer(dest,
364                                                                        (args.length > 2 ? (String) args[2] : null),
365                                                                        null,
366                                                                        (String) args[1],
367                                                                        false);
368                                                }
369                                        }
370                                        else if (methodName.equals("createSharedDurableConsumer")) {
371                                                Destination dest = (Destination) args[0];
372                                                if (dest != null) {
373                                                        return getCachedConsumer(dest,
374                                                                        (args.length > 2 ? (String) args[2] : null),
375                                                                        null,
376                                                                        (String) args[1],
377                                                                        true);
378                                                }
379                                        }
380                                }
381                        }
382                        try {
383                                return method.invoke(this.target, args);
384                        }
385                        catch (InvocationTargetException ex) {
386                                throw ex.getTargetException();
387                        }
388                }
389
390                private MessageProducer getCachedProducer(@Nullable Destination dest) throws JMSException {
391                        DestinationCacheKey cacheKey = (dest != null ? new DestinationCacheKey(dest) : null);
392                        MessageProducer producer = this.cachedProducers.get(cacheKey);
393                        if (producer != null) {
394                                if (logger.isTraceEnabled()) {
395                                        logger.trace("Found cached JMS MessageProducer for destination [" + dest + "]: " + producer);
396                                }
397                        }
398                        else {
399                                producer = this.target.createProducer(dest);
400                                if (logger.isDebugEnabled()) {
401                                        logger.debug("Registering cached JMS MessageProducer for destination [" + dest + "]: " + producer);
402                                }
403                                this.cachedProducers.put(cacheKey, producer);
404                        }
405                        return new CachedMessageProducer(producer);
406                }
407
408                private MessageConsumer getCachedConsumer(Destination dest, @Nullable String selector,
409                                @Nullable Boolean noLocal, @Nullable String subscription, boolean durable) throws JMSException {
410
411                        ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription, durable);
412                        MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
413                        if (consumer != null) {
414                                if (logger.isTraceEnabled()) {
415                                        logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
416                                }
417                        }
418                        else {
419                                if (dest instanceof Topic) {
420                                        if (noLocal == null) {
421                                                consumer = (durable ?
422                                                                this.target.createSharedDurableConsumer((Topic) dest, subscription, selector) :
423                                                                this.target.createSharedConsumer((Topic) dest, subscription, selector));
424                                        }
425                                        else {
426                                                consumer = (durable ?
427                                                                this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
428                                                                this.target.createConsumer(dest, selector, noLocal));
429                                        }
430                                }
431                                else {
432                                        consumer = this.target.createConsumer(dest, selector);
433                                }
434                                if (logger.isDebugEnabled()) {
435                                        logger.debug("Registering cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
436                                }
437                                this.cachedConsumers.put(cacheKey, consumer);
438                        }
439                        return new CachedMessageConsumer(consumer);
440                }
441
442                private void logicalClose(Session proxy) throws JMSException {
443                        // Preserve rollback-on-close semantics.
444                        if (this.transactionOpen && this.target.getTransacted()) {
445                                this.transactionOpen = false;
446                                this.target.rollback();
447                        }
448                        // Physically close durable subscribers at time of Session close call.
449                        for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
450                                Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
451                                if (entry.getKey().subscription != null) {
452                                        entry.getValue().close();
453                                        it.remove();
454                                }
455                        }
456                        // Allow for multiple close calls...
457                        boolean returned = false;
458                        synchronized (this.sessionList) {
459                                if (!this.sessionList.contains(proxy)) {
460                                        this.sessionList.addLast(proxy);
461                                        returned = true;
462                                }
463                        }
464                        if (returned && logger.isTraceEnabled()) {
465                                logger.trace("Returned cached Session: " + this.target);
466                        }
467                }
468
469                private void physicalClose() throws JMSException {
470                        if (logger.isDebugEnabled()) {
471                                logger.debug("Closing cached Session: " + this.target);
472                        }
473                        // Explicitly close all MessageProducers and MessageConsumers that
474                        // this Session happens to cache...
475                        try {
476                                for (MessageProducer producer : this.cachedProducers.values()) {
477                                        producer.close();
478                                }
479                                for (MessageConsumer consumer : this.cachedConsumers.values()) {
480                                        consumer.close();
481                                }
482                        }
483                        finally {
484                                this.cachedProducers.clear();
485                                this.cachedConsumers.clear();
486                                // Now actually close the Session.
487                                this.target.close();
488                        }
489                }
490        }
491
492
493        /**
494         * Simple wrapper class around a Destination reference.
495         * Used as the cache key when caching MessageProducer objects.
496         */
497        private static class DestinationCacheKey implements Comparable<DestinationCacheKey> {
498
499                private final Destination destination;
500
501                @Nullable
502                private String destinationString;
503
504                public DestinationCacheKey(Destination destination) {
505                        Assert.notNull(destination, "Destination must not be null");
506                        this.destination = destination;
507                }
508
509                private String getDestinationString() {
510                        if (this.destinationString == null) {
511                                this.destinationString = this.destination.toString();
512                        }
513                        return this.destinationString;
514                }
515
516                protected boolean destinationEquals(DestinationCacheKey otherKey) {
517                        return (this.destination.getClass() == otherKey.destination.getClass() &&
518                                        (this.destination.equals(otherKey.destination) ||
519                                                        getDestinationString().equals(otherKey.getDestinationString())));
520                }
521
522                @Override
523                public boolean equals(@Nullable Object other) {
524                        // Effectively checking object equality as well as toString equality.
525                        // On WebSphere MQ, Destination objects do not implement equals...
526                        return (this == other || (other instanceof DestinationCacheKey &&
527                                        destinationEquals((DestinationCacheKey) other)));
528                }
529
530                @Override
531                public int hashCode() {
532                        // Can't use a more specific hashCode since we can't rely on
533                        // this.destination.hashCode() actually being the same value
534                        // for equivalent destinations... Thanks a lot, WebSphere MQ!
535                        return this.destination.getClass().hashCode();
536                }
537
538                @Override
539                public String toString() {
540                        return getDestinationString();
541                }
542
543                @Override
544                public int compareTo(DestinationCacheKey other) {
545                        return getDestinationString().compareTo(other.getDestinationString());
546                }
547        }
548
549
550        /**
551         * Simple wrapper class around a Destination and other consumer attributes.
552         * Used as the cache key when caching MessageConsumer objects.
553         */
554        private static class ConsumerCacheKey extends DestinationCacheKey {
555
556                @Nullable
557                private final String selector;
558
559                @Nullable
560                private final Boolean noLocal;
561
562                @Nullable
563                private final String subscription;
564
565                private final boolean durable;
566
567                public ConsumerCacheKey(Destination destination, @Nullable String selector, @Nullable Boolean noLocal,
568                                @Nullable String subscription, boolean durable) {
569
570                        super(destination);
571                        this.selector = selector;
572                        this.noLocal = noLocal;
573                        this.subscription = subscription;
574                        this.durable = durable;
575                }
576
577                @Override
578                public boolean equals(@Nullable Object other) {
579                        if (this == other) {
580                                return true;
581                        }
582                        if (!(other instanceof ConsumerCacheKey)) {
583                                return false;
584                        }
585                        ConsumerCacheKey otherKey = (ConsumerCacheKey) other;
586                        return (destinationEquals(otherKey) &&
587                                        ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) &&
588                                        ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) &&
589                                        ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription) &&
590                                        this.durable == otherKey.durable);
591                }
592
593                @Override
594                public int hashCode() {
595                        return (31 * super.hashCode() + ObjectUtils.nullSafeHashCode(this.selector));
596                }
597
598                @Override
599                public String toString() {
600                        return super.toString() + " [selector=" + this.selector + ", noLocal=" + this.noLocal +
601                                        ", subscription=" + this.subscription + ", durable=" + this.durable + "]";
602                }
603        }
604
605}