001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import java.lang.reflect.InvocationHandler;
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.lang.reflect.Proxy;
023import java.util.ArrayList;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import javax.jms.Connection;
030import javax.jms.ConnectionFactory;
031import javax.jms.Destination;
032import javax.jms.JMSException;
033import javax.jms.MessageConsumer;
034import javax.jms.MessageProducer;
035import javax.jms.QueueSession;
036import javax.jms.Session;
037import javax.jms.TemporaryQueue;
038import javax.jms.TemporaryTopic;
039import javax.jms.Topic;
040import javax.jms.TopicSession;
041
042import org.springframework.util.Assert;
043import org.springframework.util.ClassUtils;
044import org.springframework.util.ObjectUtils;
045import org.springframework.util.ReflectionUtils;
046
047/**
048 * {@link SingleConnectionFactory} subclass that adds {@link javax.jms.Session}
049 * caching as well {@link javax.jms.MessageProducer} caching. This ConnectionFactory
050 * also switches the {@link #setReconnectOnException "reconnectOnException" property}
051 * to "true" by default, allowing for automatic recovery of the underlying Connection.
052 *
053 * <p>By default, only one single Session will be cached, with further requested
054 * Sessions being created and disposed on demand. Consider raising the
055 * {@link #setSessionCacheSize "sessionCacheSize" value} in case of a
056 * high-concurrency environment.
057 *
058 * <p>When using the JMS 1.0.2 API, this ConnectionFactory will switch
059 * into queue/topic mode according to the JMS API methods used at runtime:
060 * {@code createQueueConnection} and {@code createTopicConnection} will
061 * lead to queue/topic mode, respectively; generic {@code createConnection}
062 * calls will lead to a JMS 1.1 connection which is able to serve both modes.
063 *
064 * <p><b>NOTE: This ConnectionFactory requires explicit closing of all Sessions
065 * obtained from its shared Connection.</b> This is the usual recommendation for
066 * native JMS access code anyway. However, with this ConnectionFactory, its use
067 * is mandatory in order to actually allow for Session reuse.
068 *
069 * <p>Note also that MessageConsumers obtained from a cached Session won't get
070 * closed until the Session will eventually be removed from the pool. This may
071 * lead to semantic side effects in some cases. For a durable subscriber, the
072 * logical {@code Session.close()} call will also close the subscription.
073 * Re-registering a durable consumer for the same subscription on the same
074 * Session handle is not supported; close and reobtain a cached Session first.
075 *
076 * @author Juergen Hoeller
077 * @since 2.5.3
078 */
079public class CachingConnectionFactory extends SingleConnectionFactory {
080
081        /** The JMS 2.0 Session.createSharedConsumer method, if available */
082        private static final Method createSharedConsumerMethod = ClassUtils.getMethodIfAvailable(
083                        Session.class, "createSharedConsumer", Topic.class, String.class, String.class);
084
085        /** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
086        private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
087                        Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
088
089
090        private int sessionCacheSize = 1;
091
092        private boolean cacheProducers = true;
093
094        private boolean cacheConsumers = true;
095
096        private volatile boolean active = true;
097
098        private final Map<Integer, LinkedList<Session>> cachedSessions =
099                        new HashMap<Integer, LinkedList<Session>>();
100
101
102        /**
103         * Create a new CachingConnectionFactory for bean-style usage.
104         * @see #setTargetConnectionFactory
105         */
106        public CachingConnectionFactory() {
107                super();
108                setReconnectOnException(true);
109        }
110
111        /**
112         * Create a new CachingConnectionFactory for the given target
113         * ConnectionFactory.
114         * @param targetConnectionFactory the target ConnectionFactory
115         */
116        public CachingConnectionFactory(ConnectionFactory targetConnectionFactory) {
117                super(targetConnectionFactory);
118                setReconnectOnException(true);
119        }
120
121
122        /**
123         * Specify the desired size for the JMS Session cache (per JMS Session type).
124         * <p>This cache size is the maximum limit for the number of cached Sessions
125         * per session acknowledgement type (auto, client, dups_ok, transacted).
126         * As a consequence, the actual number of cached Sessions may be up to
127         * four times as high as the specified value - in the unlikely case
128         * of mixing and matching different acknowledgement types.
129         * <p>Default is 1: caching a single Session, (re-)creating further ones on
130         * demand. Specify a number like 10 if you'd like to raise the number of cached
131         * Sessions; that said, 1 may be sufficient for low-concurrency scenarios.
132         * @see #setCacheProducers
133         */
134        public void setSessionCacheSize(int sessionCacheSize) {
135                Assert.isTrue(sessionCacheSize >= 1, "Session cache size must be 1 or higher");
136                this.sessionCacheSize = sessionCacheSize;
137        }
138
139        /**
140         * Return the desired size for the JMS Session cache (per JMS Session type).
141         */
142        public int getSessionCacheSize() {
143                return this.sessionCacheSize;
144        }
145
146        /**
147         * Specify whether to cache JMS MessageProducers per JMS Session instance
148         * (more specifically: one MessageProducer per Destination and Session).
149         * <p>Default is "true". Switch this to "false" in order to always
150         * recreate MessageProducers on demand.
151         */
152        public void setCacheProducers(boolean cacheProducers) {
153                this.cacheProducers = cacheProducers;
154        }
155
156        /**
157         * Return whether to cache JMS MessageProducers per JMS Session instance.
158         */
159        public boolean isCacheProducers() {
160                return this.cacheProducers;
161        }
162
163        /**
164         * Specify whether to cache JMS MessageConsumers per JMS Session instance
165         * (more specifically: one MessageConsumer per Destination, selector String
166         * and Session). Note that durable subscribers will only be cached until
167         * logical closing of the Session handle.
168         * <p>Default is "true". Switch this to "false" in order to always
169         * recreate MessageConsumers on demand.
170         */
171        public void setCacheConsumers(boolean cacheConsumers) {
172                this.cacheConsumers = cacheConsumers;
173        }
174
175        /**
176         * Return whether to cache JMS MessageConsumers per JMS Session instance.
177         */
178        public boolean isCacheConsumers() {
179                return this.cacheConsumers;
180        }
181
182
183        /**
184         * Resets the Session cache as well.
185         */
186        @Override
187        public void resetConnection() {
188                this.active = false;
189
190                synchronized (this.cachedSessions) {
191                        for (LinkedList<Session> sessionList : this.cachedSessions.values()) {
192                                synchronized (sessionList) {
193                                        for (Session session : sessionList) {
194                                                try {
195                                                        session.close();
196                                                }
197                                                catch (Throwable ex) {
198                                                        logger.trace("Could not close cached JMS Session", ex);
199                                                }
200                                        }
201                                }
202                        }
203                        this.cachedSessions.clear();
204                }
205
206                // Now proceed with actual closing of the shared Connection...
207                super.resetConnection();
208
209                this.active = true;
210        }
211
212        /**
213         * Checks for a cached Session for the given mode.
214         */
215        @Override
216        protected Session getSession(Connection con, Integer mode) throws JMSException {
217                if (!this.active) {
218                        return null;
219                }
220
221                LinkedList<Session> sessionList;
222                synchronized (this.cachedSessions) {
223                        sessionList = this.cachedSessions.get(mode);
224                        if (sessionList == null) {
225                                sessionList = new LinkedList<Session>();
226                                this.cachedSessions.put(mode, sessionList);
227                        }
228                }
229                Session session = null;
230                synchronized (sessionList) {
231                        if (!sessionList.isEmpty()) {
232                                session = sessionList.removeFirst();
233                        }
234                }
235                if (session != null) {
236                        if (logger.isTraceEnabled()) {
237                                logger.trace("Found cached JMS Session for mode " + mode + ": " +
238                                                (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session));
239                        }
240                }
241                else {
242                        Session targetSession = createSession(con, mode);
243                        if (logger.isDebugEnabled()) {
244                                logger.debug("Registering cached JMS Session for mode " + mode + ": " + targetSession);
245                        }
246                        session = getCachedSessionProxy(targetSession, sessionList);
247                }
248                return session;
249        }
250
251        /**
252         * Wrap the given Session with a proxy that delegates every method call to it
253         * but adapts close calls. This is useful for allowing application code to
254         * handle a special framework Session just like an ordinary Session.
255         * @param target the original Session to wrap
256         * @param sessionList the List of cached Sessions that the given Session belongs to
257         * @return the wrapped Session
258         */
259        protected Session getCachedSessionProxy(Session target, LinkedList<Session> sessionList) {
260                List<Class<?>> classes = new ArrayList<Class<?>>(3);
261                classes.add(SessionProxy.class);
262                if (target instanceof QueueSession) {
263                        classes.add(QueueSession.class);
264                }
265                if (target instanceof TopicSession) {
266                        classes.add(TopicSession.class);
267                }
268                return (Session) Proxy.newProxyInstance(SessionProxy.class.getClassLoader(),
269                                ClassUtils.toClassArray(classes), new CachedSessionInvocationHandler(target, sessionList));
270        }
271
272
273        /**
274         * Invocation handler for a cached JMS Session proxy.
275         */
276        private class CachedSessionInvocationHandler implements InvocationHandler {
277
278                private final Session target;
279
280                private final LinkedList<Session> sessionList;
281
282                private final Map<DestinationCacheKey, MessageProducer> cachedProducers =
283                                new HashMap<DestinationCacheKey, MessageProducer>();
284
285                private final Map<ConsumerCacheKey, MessageConsumer> cachedConsumers =
286                                new HashMap<ConsumerCacheKey, MessageConsumer>();
287
288                private boolean transactionOpen = false;
289
290                public CachedSessionInvocationHandler(Session target, LinkedList<Session> sessionList) {
291                        this.target = target;
292                        this.sessionList = sessionList;
293                }
294
295                @Override
296                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
297                        String methodName = method.getName();
298                        if (methodName.equals("equals")) {
299                                // Only consider equal when proxies are identical.
300                                return (proxy == args[0]);
301                        }
302                        else if (methodName.equals("hashCode")) {
303                                // Use hashCode of Session proxy.
304                                return System.identityHashCode(proxy);
305                        }
306                        else if (methodName.equals("toString")) {
307                                return "Cached JMS Session: " + this.target;
308                        }
309                        else if (methodName.equals("close")) {
310                                // Handle close method: don't pass the call on.
311                                if (active) {
312                                        synchronized (this.sessionList) {
313                                                if (this.sessionList.size() < getSessionCacheSize()) {
314                                                        try {
315                                                                logicalClose((Session) proxy);
316                                                                // Remain open in the session list.
317                                                                return null;
318                                                        }
319                                                        catch (JMSException ex) {
320                                                                logger.trace("Logical close of cached JMS Session failed - discarding it", ex);
321                                                                // Proceed to physical close from here...
322                                                        }
323                                                }
324                                        }
325                                }
326                                // If we get here, we're supposed to shut down.
327                                physicalClose();
328                                return null;
329                        }
330                        else if (methodName.equals("getTargetSession")) {
331                                // Handle getTargetSession method: return underlying Session.
332                                return this.target;
333                        }
334                        else if (methodName.equals("commit") || methodName.equals("rollback")) {
335                                this.transactionOpen = false;
336                        }
337                        else if (methodName.startsWith("create")) {
338                                this.transactionOpen = true;
339                                if (isCacheProducers() && (methodName.equals("createProducer") ||
340                                                methodName.equals("createSender") || methodName.equals("createPublisher"))) {
341                                        // Destination argument being null is ok for a producer
342                                        Destination dest = (Destination) args[0];
343                                        if (!(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
344                                                return getCachedProducer(dest);
345                                        }
346                                }
347                                else if (isCacheConsumers()) {
348                                        // let raw JMS invocation throw an exception if Destination (i.e. args[0]) is null
349                                        if ((methodName.equals("createConsumer") || methodName.equals("createReceiver") ||
350                                                        methodName.equals("createSubscriber"))) {
351                                                Destination dest = (Destination) args[0];
352                                                if (dest != null && !(dest instanceof TemporaryQueue || dest instanceof TemporaryTopic)) {
353                                                        return getCachedConsumer(dest,
354                                                                        (args.length > 1 ? (String) args[1] : null),
355                                                                        (args.length > 2 && (Boolean) args[2]),
356                                                                        null,
357                                                                        false);
358                                                }
359                                        }
360                                        else if (methodName.equals("createDurableConsumer") || methodName.equals("createDurableSubscriber")) {
361                                                Destination dest = (Destination) args[0];
362                                                if (dest != null) {
363                                                        return getCachedConsumer(dest,
364                                                                        (args.length > 2 ? (String) args[2] : null),
365                                                                        (args.length > 3 && (Boolean) args[3]),
366                                                                        (String) args[1],
367                                                                        true);
368                                                }
369                                        }
370                                        else if (methodName.equals("createSharedConsumer")) {
371                                                Destination dest = (Destination) args[0];
372                                                if (dest != null) {
373                                                        return getCachedConsumer(dest,
374                                                                        (args.length > 2 ? (String) args[2] : null),
375                                                                        null,
376                                                                        (String) args[1],
377                                                                        false);
378                                                }
379                                        }
380                                        else if (methodName.equals("createSharedDurableConsumer")) {
381                                                Destination dest = (Destination) args[0];
382                                                if (dest != null) {
383                                                        return getCachedConsumer(dest,
384                                                                        (args.length > 2 ? (String) args[2] : null),
385                                                                        null,
386                                                                        (String) args[1],
387                                                                        true);
388                                                }
389                                        }
390                                }
391                        }
392                        try {
393                                return method.invoke(this.target, args);
394                        }
395                        catch (InvocationTargetException ex) {
396                                throw ex.getTargetException();
397                        }
398                }
399
400                private MessageProducer getCachedProducer(Destination dest) throws JMSException {
401                        DestinationCacheKey cacheKey = (dest != null ? new DestinationCacheKey(dest) : null);
402                        MessageProducer producer = this.cachedProducers.get(cacheKey);
403                        if (producer != null) {
404                                if (logger.isTraceEnabled()) {
405                                        logger.trace("Found cached JMS MessageProducer for destination [" + dest + "]: " + producer);
406                                }
407                        }
408                        else {
409                                producer = this.target.createProducer(dest);
410                                if (logger.isDebugEnabled()) {
411                                        logger.debug("Registering cached JMS MessageProducer for destination [" + dest + "]: " + producer);
412                                }
413                                this.cachedProducers.put(cacheKey, producer);
414                        }
415                        return new CachedMessageProducer(producer).getProxyIfNecessary();
416                }
417
418                private MessageConsumer getCachedConsumer(
419                                Destination dest, String selector, Boolean noLocal, String subscription, boolean durable) throws JMSException {
420
421                        ConsumerCacheKey cacheKey = new ConsumerCacheKey(dest, selector, noLocal, subscription, durable);
422                        MessageConsumer consumer = this.cachedConsumers.get(cacheKey);
423                        if (consumer != null) {
424                                if (logger.isTraceEnabled()) {
425                                        logger.trace("Found cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
426                                }
427                        }
428                        else {
429                                if (dest instanceof Topic) {
430                                        if (noLocal == null) {
431                                                // createSharedConsumer((Topic) dest, subscription, selector);
432                                                // createSharedDurableConsumer((Topic) dest, subscription, selector);
433                                                Method method = (durable ? createSharedDurableConsumerMethod : createSharedConsumerMethod);
434                                                try {
435                                                        consumer = (MessageConsumer) method.invoke(this.target, dest, subscription, selector);
436                                                }
437                                                catch (InvocationTargetException ex) {
438                                                        if (ex.getTargetException() instanceof JMSException) {
439                                                                throw (JMSException) ex.getTargetException();
440                                                        }
441                                                        ReflectionUtils.handleInvocationTargetException(ex);
442                                                }
443                                                catch (IllegalAccessException ex) {
444                                                        throw new IllegalStateException("Could not access JMS 2.0 API method: " + ex.getMessage());
445                                                }
446                                        }
447                                        else {
448                                                consumer = (durable ?
449                                                                this.target.createDurableSubscriber((Topic) dest, subscription, selector, noLocal) :
450                                                                this.target.createConsumer(dest, selector, noLocal));
451                                        }
452                                }
453                                else {
454                                        consumer = this.target.createConsumer(dest, selector);
455                                }
456                                if (logger.isDebugEnabled()) {
457                                        logger.debug("Registering cached JMS MessageConsumer for destination [" + dest + "]: " + consumer);
458                                }
459                                this.cachedConsumers.put(cacheKey, consumer);
460                        }
461                        return new CachedMessageConsumer(consumer);
462                }
463
464                private void logicalClose(Session proxy) throws JMSException {
465                        // Preserve rollback-on-close semantics.
466                        if (this.transactionOpen && this.target.getTransacted()) {
467                                this.transactionOpen = false;
468                                this.target.rollback();
469                        }
470                        // Physically close durable subscribers at time of Session close call.
471                        for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this.cachedConsumers.entrySet().iterator(); it.hasNext();) {
472                                Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next();
473                                if (entry.getKey().subscription != null) {
474                                        entry.getValue().close();
475                                        it.remove();
476                                }
477                        }
478                        // Allow for multiple close calls...
479                        boolean returned = false;
480                        synchronized (this.sessionList) {
481                                if (!this.sessionList.contains(proxy)) {
482                                        this.sessionList.addLast(proxy);
483                                        returned = true;
484                                }
485                        }
486                        if (returned && logger.isTraceEnabled()) {
487                                logger.trace("Returned cached Session: " + this.target);
488                        }
489                }
490
491                private void physicalClose() throws JMSException {
492                        if (logger.isDebugEnabled()) {
493                                logger.debug("Closing cached Session: " + this.target);
494                        }
495                        // Explicitly close all MessageProducers and MessageConsumers that
496                        // this Session happens to cache...
497                        try {
498                                for (MessageProducer producer : this.cachedProducers.values()) {
499                                        producer.close();
500                                }
501                                for (MessageConsumer consumer : this.cachedConsumers.values()) {
502                                        consumer.close();
503                                }
504                        }
505                        finally {
506                                this.cachedProducers.clear();
507                                this.cachedConsumers.clear();
508                                // Now actually close the Session.
509                                this.target.close();
510                        }
511                }
512        }
513
514
515        /**
516         * Simple wrapper class around a Destination reference.
517         * Used as the cache key when caching MessageProducer objects.
518         */
519        private static class DestinationCacheKey implements Comparable<DestinationCacheKey> {
520
521                private final Destination destination;
522
523                private String destinationString;
524
525                public DestinationCacheKey(Destination destination) {
526                        Assert.notNull(destination, "Destination must not be null");
527                        this.destination = destination;
528                }
529
530                private String getDestinationString() {
531                        if (this.destinationString == null) {
532                                this.destinationString = this.destination.toString();
533                        }
534                        return this.destinationString;
535                }
536
537                protected boolean destinationEquals(DestinationCacheKey otherKey) {
538                        return (this.destination.getClass() == otherKey.destination.getClass() &&
539                                        (this.destination.equals(otherKey.destination) ||
540                                                        getDestinationString().equals(otherKey.getDestinationString())));
541                }
542
543                @Override
544                public boolean equals(Object other) {
545                        // Effectively checking object equality as well as toString equality.
546                        // On WebSphere MQ, Destination objects do not implement equals...
547                        return (this == other || destinationEquals((DestinationCacheKey) other));
548                }
549
550                @Override
551                public int hashCode() {
552                        // Can't use a more specific hashCode since we can't rely on
553                        // this.destination.hashCode() actually being the same value
554                        // for equivalent destinations... Thanks a lot, WebSphere MQ!
555                        return this.destination.getClass().hashCode();
556                }
557
558                @Override
559                public String toString() {
560                        return getDestinationString();
561                }
562
563                @Override
564                public int compareTo(DestinationCacheKey other) {
565                        return getDestinationString().compareTo(other.getDestinationString());
566                }
567        }
568
569
570        /**
571         * Simple wrapper class around a Destination and other consumer attributes.
572         * Used as the cache key when caching MessageConsumer objects.
573         */
574        private static class ConsumerCacheKey extends DestinationCacheKey {
575
576                private final String selector;
577
578                private final Boolean noLocal;
579
580                private final String subscription;
581
582                private final boolean durable;
583
584                public ConsumerCacheKey(Destination destination, String selector, Boolean noLocal, String subscription, boolean durable) {
585                        super(destination);
586                        this.selector = selector;
587                        this.noLocal = noLocal;
588                        this.subscription = subscription;
589                        this.durable = durable;
590                }
591
592                @Override
593                public boolean equals(Object other) {
594                        if (this == other) {
595                                return true;
596                        }
597                        ConsumerCacheKey otherKey = (ConsumerCacheKey) other;
598                        return (destinationEquals(otherKey) &&
599                                        ObjectUtils.nullSafeEquals(this.selector, otherKey.selector) &&
600                                        ObjectUtils.nullSafeEquals(this.noLocal, otherKey.noLocal) &&
601                                        ObjectUtils.nullSafeEquals(this.subscription, otherKey.subscription) &&
602                                        this.durable == otherKey.durable);
603                }
604
605                @Override
606                public String toString() {
607                        return super.toString() + " [selector=" + this.selector + ", noLocal=" + this.noLocal +
608                                        ", subscription=" + this.subscription + ", durable=" + this.durable + "]";
609                }
610        }
611
612}