001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import java.lang.reflect.InvocationHandler;
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.lang.reflect.Proxy;
023import java.util.ArrayList;
024import java.util.LinkedHashSet;
025import java.util.List;
026import java.util.Set;
027import javax.jms.Connection;
028import javax.jms.ConnectionFactory;
029import javax.jms.ExceptionListener;
030import javax.jms.JMSException;
031import javax.jms.QueueConnection;
032import javax.jms.QueueConnectionFactory;
033import javax.jms.Session;
034import javax.jms.TopicConnection;
035import javax.jms.TopicConnectionFactory;
036
037import org.apache.commons.logging.Log;
038import org.apache.commons.logging.LogFactory;
039
040import org.springframework.beans.factory.DisposableBean;
041import org.springframework.beans.factory.InitializingBean;
042import org.springframework.util.Assert;
043import org.springframework.util.ClassUtils;
044
045/**
046 * A JMS ConnectionFactory adapter that returns the same Connection
047 * from all {@link #createConnection()} calls, and ignores calls to
048 * {@link javax.jms.Connection#close()}. According to the JMS Connection
049 * model, this is perfectly thread-safe (in contrast to e.g. JDBC). The
050 * shared Connection can be automatically recovered in case of an Exception.
051 *
052 * <p>You can either pass in a specific JMS Connection directly or let this
053 * factory lazily create a Connection via a given target ConnectionFactory.
054 * This factory generally works with JMS 1.1 as well as the JMS 1.0.2 API.
055 *
056 * <p>Note that when using the JMS 1.0.2 API, this ConnectionFactory will switch
057 * into queue/topic mode according to the JMS API methods used at runtime:
058 * {@code createQueueConnection} and {@code createTopicConnection} will
059 * lead to queue/topic mode, respectively; generic {@code createConnection}
060 * calls will lead to a JMS 1.1 connection which is able to serve both modes.
061 *
062 * <p>Useful for testing and standalone environments in order to keep using the
063 * same Connection for multiple {@link org.springframework.jms.core.JmsTemplate}
064 * calls, without having a pooling ConnectionFactory underneath. This may span
065 * any number of transactions, even concurrently executing transactions.
066 *
067 * <p>Note that Spring's message listener containers support the use of
068 * a shared Connection within each listener container instance. Using
069 * SingleConnectionFactory in combination only really makes sense for
070 * sharing a single JMS Connection <i>across multiple listener containers</i>.
071 *
072 * @author Juergen Hoeller
073 * @author Mark Pollack
074 * @since 1.1
075 * @see org.springframework.jms.core.JmsTemplate
076 * @see org.springframework.jms.listener.SimpleMessageListenerContainer
077 * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel
078 */
079public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
080                TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {
081
082        protected final Log logger = LogFactory.getLog(getClass());
083
084        private ConnectionFactory targetConnectionFactory;
085
086        private String clientId;
087
088        private ExceptionListener exceptionListener;
089
090        private boolean reconnectOnException = false;
091
092        /** The target Connection */
093        private Connection connection;
094
095        /** A hint whether to create a queue or topic connection */
096        private Boolean pubSubMode;
097
098        /** An internal aggregator allowing for per-connection ExceptionListeners */
099        private AggregatedExceptionListener aggregatedExceptionListener;
100
101        /** Whether the shared Connection has been started */
102        private int startedCount = 0;
103
104        /** Synchronization monitor for the shared Connection */
105        private final Object connectionMonitor = new Object();
106
107
108        /**
109         * Create a new SingleConnectionFactory for bean-style usage.
110         * @see #setTargetConnectionFactory
111         */
112        public SingleConnectionFactory() {
113        }
114
115        /**
116         * Create a new SingleConnectionFactory that always returns the given Connection.
117         * @param targetConnection the single Connection
118         */
119        public SingleConnectionFactory(Connection targetConnection) {
120                Assert.notNull(targetConnection, "Target Connection must not be null");
121                this.connection = targetConnection;
122        }
123
124        /**
125         * Create a new SingleConnectionFactory that always returns a single Connection
126         * that it will lazily create via the given target ConnectionFactory.
127         * @param targetConnectionFactory the target ConnectionFactory
128         */
129        public SingleConnectionFactory(ConnectionFactory targetConnectionFactory) {
130                Assert.notNull(targetConnectionFactory, "Target ConnectionFactory must not be null");
131                this.targetConnectionFactory = targetConnectionFactory;
132        }
133
134
135        /**
136         * Set the target ConnectionFactory which will be used to lazily
137         * create a single Connection.
138         */
139        public void setTargetConnectionFactory(ConnectionFactory targetConnectionFactory) {
140                this.targetConnectionFactory = targetConnectionFactory;
141        }
142
143        /**
144         * Return the target ConnectionFactory which will be used to lazily
145         * create a single Connection, if any.
146         */
147        public ConnectionFactory getTargetConnectionFactory() {
148                return this.targetConnectionFactory;
149        }
150
151        /**
152         * Specify a JMS client ID for the single Connection created and exposed
153         * by this ConnectionFactory.
154         * <p>Note that client IDs need to be unique among all active Connections
155         * of the underlying JMS provider. Furthermore, a client ID can only be
156         * assigned if the original ConnectionFactory hasn't already assigned one.
157         * @see javax.jms.Connection#setClientID
158         * @see #setTargetConnectionFactory
159         */
160        public void setClientId(String clientId) {
161                this.clientId = clientId;
162        }
163
164        /**
165         * Return a JMS client ID for the single Connection created and exposed
166         * by this ConnectionFactory, if any.
167         */
168        protected String getClientId() {
169                return this.clientId;
170        }
171
172        /**
173         * Specify an JMS ExceptionListener implementation that should be
174         * registered with the single Connection created by this factory.
175         * @see #setReconnectOnException
176         */
177        public void setExceptionListener(ExceptionListener exceptionListener) {
178                this.exceptionListener = exceptionListener;
179        }
180
181        /**
182         * Return the JMS ExceptionListener implementation that should be registered
183         * with the single Connection created by this factory, if any.
184         */
185        protected ExceptionListener getExceptionListener() {
186                return this.exceptionListener;
187        }
188
189        /**
190         * Specify whether the single Connection should be reset (to be subsequently renewed)
191         * when a JMSException is reported by the underlying Connection.
192         * <p>Default is "false". Switch this to "true" to automatically trigger
193         * recovery based on your JMS provider's exception notifications.
194         * <p>Internally, this will lead to a special JMS ExceptionListener
195         * (this SingleConnectionFactory itself) being registered with the
196         * underlying Connection. This can also be combined with a
197         * user-specified ExceptionListener, if desired.
198         * @see #setExceptionListener
199         */
200        public void setReconnectOnException(boolean reconnectOnException) {
201                this.reconnectOnException = reconnectOnException;
202        }
203
204        /**
205         * Return whether the single Connection should be renewed when
206         * a JMSException is reported by the underlying Connection.
207         */
208        protected boolean isReconnectOnException() {
209                return this.reconnectOnException;
210        }
211
212        /**
213         * Make sure a Connection or ConnectionFactory has been set.
214         */
215        @Override
216        public void afterPropertiesSet() {
217                if (this.connection == null && getTargetConnectionFactory() == null) {
218                        throw new IllegalArgumentException("Target Connection or ConnectionFactory is required");
219                }
220        }
221
222
223        @Override
224        public Connection createConnection() throws JMSException {
225                return getSharedConnectionProxy(getConnection());
226        }
227
228        @Override
229        public Connection createConnection(String username, String password) throws JMSException {
230                throw new javax.jms.IllegalStateException(
231                                "SingleConnectionFactory does not support custom username and password");
232        }
233
234        @Override
235        public QueueConnection createQueueConnection() throws JMSException {
236                Connection con;
237                synchronized (this.connectionMonitor) {
238                        this.pubSubMode = Boolean.FALSE;
239                        con = createConnection();
240                }
241                if (!(con instanceof QueueConnection)) {
242                        throw new javax.jms.IllegalStateException(
243                                        "This SingleConnectionFactory does not hold a QueueConnection but rather: " + con);
244                }
245                return ((QueueConnection) con);
246        }
247
248        @Override
249        public QueueConnection createQueueConnection(String username, String password) throws JMSException {
250                throw new javax.jms.IllegalStateException(
251                                "SingleConnectionFactory does not support custom username and password");
252        }
253
254        @Override
255        public TopicConnection createTopicConnection() throws JMSException {
256                Connection con;
257                synchronized (this.connectionMonitor) {
258                        this.pubSubMode = Boolean.TRUE;
259                        con = createConnection();
260                }
261                if (!(con instanceof TopicConnection)) {
262                        throw new javax.jms.IllegalStateException(
263                                        "This SingleConnectionFactory does not hold a TopicConnection but rather: " + con);
264                }
265                return ((TopicConnection) con);
266        }
267
268        @Override
269        public TopicConnection createTopicConnection(String username, String password) throws JMSException {
270                throw new javax.jms.IllegalStateException(
271                                "SingleConnectionFactory does not support custom username and password");
272        }
273
274
275        /**
276         * Obtain an initialized shared Connection.
277         * @return the Connection (never {@code null})
278         * @throws javax.jms.JMSException if thrown by JMS API methods
279         * @see #initConnection()
280         */
281        protected Connection getConnection() throws JMSException {
282                synchronized (this.connectionMonitor) {
283                        if (this.connection == null) {
284                                initConnection();
285                        }
286                        return this.connection;
287                }
288        }
289
290        /**
291         * Initialize the underlying shared Connection.
292         * <p>Closes and reinitializes the Connection if an underlying
293         * Connection is present already.
294         * @throws javax.jms.JMSException if thrown by JMS API methods
295         * @see #prepareConnection
296         */
297        public void initConnection() throws JMSException {
298                if (getTargetConnectionFactory() == null) {
299                        throw new IllegalStateException(
300                                        "'targetConnectionFactory' is required for lazily initializing a Connection");
301                }
302                synchronized (this.connectionMonitor) {
303                        if (this.connection != null) {
304                                closeConnection(this.connection);
305                        }
306                        this.connection = doCreateConnection();
307                        prepareConnection(this.connection);
308                        if (this.startedCount > 0) {
309                                this.connection.start();
310                        }
311                        if (logger.isInfoEnabled()) {
312                                logger.info("Established shared JMS Connection: " + this.connection);
313                        }
314                }
315        }
316
317        /**
318         * Exception listener callback that renews the underlying single Connection.
319         * @see #resetConnection()
320         */
321        @Override
322        public void onException(JMSException ex) {
323                logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
324                resetConnection();
325        }
326
327        /**
328         * Close the underlying shared connection.
329         * The provider of this ConnectionFactory needs to care for proper shutdown.
330         * <p>As this bean implements DisposableBean, a bean factory will
331         * automatically invoke this on destruction of its cached singletons.
332         * @see #resetConnection()
333         */
334        @Override
335        public void destroy() {
336                resetConnection();
337        }
338
339        /**
340         * Reset the underlying shared Connection, to be reinitialized on next access.
341         * @see #closeConnection
342         */
343        public void resetConnection() {
344                synchronized (this.connectionMonitor) {
345                        if (this.connection != null) {
346                                closeConnection(this.connection);
347                        }
348                        this.connection = null;
349                }
350        }
351
352        /**
353         * Create a JMS Connection via this template's ConnectionFactory.
354         * @return the new JMS Connection
355         * @throws javax.jms.JMSException if thrown by JMS API methods
356         */
357        protected Connection doCreateConnection() throws JMSException {
358                ConnectionFactory cf = getTargetConnectionFactory();
359                if (Boolean.FALSE.equals(this.pubSubMode) && cf instanceof QueueConnectionFactory) {
360                        return ((QueueConnectionFactory) cf).createQueueConnection();
361                }
362                else if (Boolean.TRUE.equals(this.pubSubMode) && cf instanceof TopicConnectionFactory) {
363                        return ((TopicConnectionFactory) cf).createTopicConnection();
364                }
365                else {
366                        return getTargetConnectionFactory().createConnection();
367                }
368        }
369
370        /**
371         * Prepare the given Connection before it is exposed.
372         * <p>The default implementation applies ExceptionListener and client id.
373         * Can be overridden in subclasses.
374         * @param con the Connection to prepare
375         * @throws JMSException if thrown by JMS API methods
376         * @see #setExceptionListener
377         * @see #setReconnectOnException
378         */
379        protected void prepareConnection(Connection con) throws JMSException {
380                if (getClientId() != null) {
381                        con.setClientID(getClientId());
382                }
383                if (this.aggregatedExceptionListener != null) {
384                        con.setExceptionListener(this.aggregatedExceptionListener);
385                }
386                else if (getExceptionListener() != null || isReconnectOnException()) {
387                        ExceptionListener listenerToUse = getExceptionListener();
388                        if (isReconnectOnException()) {
389                                this.aggregatedExceptionListener = new AggregatedExceptionListener();
390                                this.aggregatedExceptionListener.delegates.add(this);
391                                if (listenerToUse != null) {
392                                        this.aggregatedExceptionListener.delegates.add(listenerToUse);
393                                }
394                                listenerToUse = this.aggregatedExceptionListener;
395                        }
396                        con.setExceptionListener(listenerToUse);
397                }
398        }
399
400        /**
401         * Template method for obtaining a (potentially cached) Session.
402         * <p>The default implementation always returns {@code null}.
403         * Subclasses may override this for exposing specific Session handles,
404         * possibly delegating to {@link #createSession} for the creation of raw
405         * Session objects that will then get wrapped and returned from here.
406         * @param con the JMS Connection to operate on
407         * @param mode the Session acknowledgement mode
408         * ({@code Session.TRANSACTED} or one of the common modes)
409         * @return the Session to use, or {@code null} to indicate
410         * creation of a raw standard Session
411         * @throws JMSException if thrown by the JMS API
412         */
413        protected Session getSession(Connection con, Integer mode) throws JMSException {
414                return null;
415        }
416
417        /**
418         * Create a default Session for this ConnectionFactory,
419         * adapting to JMS 1.0.2 style queue/topic mode if necessary.
420         * @param con the JMS Connection to operate on
421         * @param mode the Session acknowledgement mode
422         * ({@code Session.TRANSACTED} or one of the common modes)
423         * @return the newly created Session
424         * @throws JMSException if thrown by the JMS API
425         */
426        protected Session createSession(Connection con, Integer mode) throws JMSException {
427                // Determine JMS API arguments...
428                boolean transacted = (mode == Session.SESSION_TRANSACTED);
429                int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode);
430                // Now actually call the appropriate JMS factory method...
431                if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) {
432                        return ((QueueConnection) con).createQueueSession(transacted, ackMode);
433                }
434                else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) {
435                        return ((TopicConnection) con).createTopicSession(transacted, ackMode);
436                }
437                else {
438                        return con.createSession(transacted, ackMode);
439                }
440        }
441
442        /**
443         * Close the given Connection.
444         * @param con the Connection to close
445         */
446        protected void closeConnection(Connection con) {
447                if (logger.isDebugEnabled()) {
448                        logger.debug("Closing shared JMS Connection: " + con);
449                }
450                try {
451                        try {
452                                if (this.startedCount > 0) {
453                                        con.stop();
454                                }
455                        }
456                        finally {
457                                con.close();
458                        }
459                }
460                catch (javax.jms.IllegalStateException ex) {
461                        logger.debug("Ignoring Connection state exception - assuming already closed: " + ex);
462                }
463                catch (Throwable ex) {
464                        logger.debug("Could not close shared JMS Connection", ex);
465                }
466        }
467
468        /**
469         * Wrap the given Connection with a proxy that delegates every method call to it
470         * but suppresses close calls. This is useful for allowing application code to
471         * handle a special framework Connection just like an ordinary Connection from a
472         * JMS ConnectionFactory.
473         * @param target the original Connection to wrap
474         * @return the wrapped Connection
475         */
476        protected Connection getSharedConnectionProxy(Connection target) {
477                List<Class<?>> classes = new ArrayList<Class<?>>(3);
478                classes.add(Connection.class);
479                if (target instanceof QueueConnection) {
480                        classes.add(QueueConnection.class);
481                }
482                if (target instanceof TopicConnection) {
483                        classes.add(TopicConnection.class);
484                }
485                return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
486                                ClassUtils.toClassArray(classes), new SharedConnectionInvocationHandler());
487        }
488
489
490        /**
491         * Invocation handler for a cached JMS Connection proxy.
492         */
493        private class SharedConnectionInvocationHandler implements InvocationHandler {
494
495                private ExceptionListener localExceptionListener;
496
497                private boolean locallyStarted = false;
498
499                @Override
500                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
501                        if (method.getName().equals("equals")) {
502                                Object other = args[0];
503                                if (proxy == other) {
504                                        return true;
505                                }
506                                if (other == null || !Proxy.isProxyClass(other.getClass())) {
507                                        return false;
508                                }
509                                InvocationHandler otherHandler = Proxy.getInvocationHandler(other);
510                                return (otherHandler instanceof SharedConnectionInvocationHandler &&
511                                                factory() == ((SharedConnectionInvocationHandler) otherHandler).factory());
512                        }
513                        else if (method.getName().equals("hashCode")) {
514                                // Use hashCode of containing SingleConnectionFactory.
515                                return System.identityHashCode(factory());
516                        }
517                        else if (method.getName().equals("toString")) {
518                                return "Shared JMS Connection: " + getConnection();
519                        }
520                        else if (method.getName().equals("setClientID")) {
521                                // Handle setClientID method: throw exception if not compatible.
522                                String currentClientId = getConnection().getClientID();
523                                if (currentClientId != null && currentClientId.equals(args[0])) {
524                                        return null;
525                                }
526                                else {
527                                        throw new javax.jms.IllegalStateException(
528                                                        "setClientID call not supported on proxy for shared Connection. " +
529                                                        "Set the 'clientId' property on the SingleConnectionFactory instead.");
530                                }
531                        }
532                        else if (method.getName().equals("setExceptionListener")) {
533                                // Handle setExceptionListener method: add to the chain.
534                                synchronized (connectionMonitor) {
535                                        if (aggregatedExceptionListener != null) {
536                                                ExceptionListener listener = (ExceptionListener) args[0];
537                                                if (listener != this.localExceptionListener) {
538                                                        if (this.localExceptionListener != null) {
539                                                                aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
540                                                        }
541                                                        if (listener != null) {
542                                                                aggregatedExceptionListener.delegates.add(listener);
543                                                        }
544                                                        this.localExceptionListener = listener;
545                                                }
546                                                return null;
547                                        }
548                                        else {
549                                                throw new javax.jms.IllegalStateException(
550                                                                "setExceptionListener call not supported on proxy for shared Connection. " +
551                                                                "Set the 'exceptionListener' property on the SingleConnectionFactory instead. " +
552                                                                "Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " +
553                                                                "which will allow for registering further ExceptionListeners to the recovery chain.");
554                                        }
555                                }
556                        }
557                        else if (method.getName().equals("getExceptionListener")) {
558                                synchronized (connectionMonitor) {
559                                        if (this.localExceptionListener != null) {
560                                                return this.localExceptionListener;
561                                        }
562                                        else {
563                                                return getExceptionListener();
564                                        }
565                                }
566                        }
567                        else if (method.getName().equals("start")) {
568                                localStart();
569                                return null;
570                        }
571                        else if (method.getName().equals("stop")) {
572                                localStop();
573                                return null;
574                        }
575                        else if (method.getName().equals("close")) {
576                                localStop();
577                                synchronized (connectionMonitor) {
578                                        if (this.localExceptionListener != null) {
579                                                if (aggregatedExceptionListener != null) {
580                                                        aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
581                                                }
582                                                this.localExceptionListener = null;
583                                        }
584                                }
585                                return null;
586                        }
587                        else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
588                                        method.getName().equals("createTopicSession")) {
589                                // Default: JMS 2.0 createSession() method
590                                Integer mode = Session.AUTO_ACKNOWLEDGE;
591                                if (args != null) {
592                                        if (args.length == 1) {
593                                                // JMS 2.0 createSession(int) method
594                                                mode = (Integer) args[0];
595                                        }
596                                        else if (args.length == 2) {
597                                                // JMS 1.1 createSession(boolean, int) method
598                                                boolean transacted = (Boolean) args[0];
599                                                Integer ackMode = (Integer) args[1];
600                                                mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
601                                        }
602                                }
603                                Session session = getSession(getConnection(), mode);
604                                if (session != null) {
605                                        if (!method.getReturnType().isInstance(session)) {
606                                                String msg = "JMS Session does not implement specific domain: " + session;
607                                                try {
608                                                        session.close();
609                                                }
610                                                catch (Throwable ex) {
611                                                        logger.trace("Failed to close newly obtained JMS Session", ex);
612                                                }
613                                                throw new javax.jms.IllegalStateException(msg);
614                                        }
615                                        return session;
616                                }
617                        }
618                        try {
619                                return method.invoke(getConnection(), args);
620                        }
621                        catch (InvocationTargetException ex) {
622                                throw ex.getTargetException();
623                        }
624                }
625
626                private void localStart() throws JMSException {
627                        synchronized (connectionMonitor) {
628                                if (!this.locallyStarted) {
629                                        this.locallyStarted = true;
630                                        if (startedCount == 0 && connection != null) {
631                                                connection.start();
632                                        }
633                                        startedCount++;
634                                }
635                        }
636                }
637
638                private void localStop() throws JMSException {
639                        synchronized (connectionMonitor) {
640                                if (this.locallyStarted) {
641                                        this.locallyStarted = false;
642                                        if (startedCount == 1 && connection != null) {
643                                                connection.stop();
644                                        }
645                                        if (startedCount > 0) {
646                                                startedCount--;
647                                        }
648                                }
649                        }
650                }
651
652                private SingleConnectionFactory factory() {
653                        return SingleConnectionFactory.this;
654                }
655        }
656
657
658        /**
659         * Internal aggregated ExceptionListener for handling the internal
660         * recovery listener in combination with user-specified listeners.
661         */
662        private class AggregatedExceptionListener implements ExceptionListener {
663
664                final Set<ExceptionListener> delegates = new LinkedHashSet<ExceptionListener>(2);
665
666                @Override
667                public void onException(JMSException ex) {
668                        // Iterate over temporary copy in order to avoid ConcurrentModificationException,
669                        // since listener invocations may in turn trigger registration of listeners...
670                        Set<ExceptionListener> copy;
671                        synchronized (connectionMonitor) {
672                                copy = new LinkedHashSet<ExceptionListener>(this.delegates);
673                        }
674                        for (ExceptionListener listener : copy) {
675                                listener.onException(ex);
676                        }
677                }
678        }
679
680}