001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import java.lang.reflect.InvocationHandler;
020import java.lang.reflect.InvocationTargetException;
021import java.lang.reflect.Method;
022import java.lang.reflect.Proxy;
023import java.util.ArrayList;
024import java.util.LinkedHashSet;
025import java.util.List;
026import java.util.Set;
027
028import javax.jms.Connection;
029import javax.jms.ConnectionFactory;
030import javax.jms.ExceptionListener;
031import javax.jms.JMSContext;
032import javax.jms.JMSException;
033import javax.jms.QueueConnection;
034import javax.jms.QueueConnectionFactory;
035import javax.jms.Session;
036import javax.jms.TopicConnection;
037import javax.jms.TopicConnectionFactory;
038
039import org.apache.commons.logging.Log;
040import org.apache.commons.logging.LogFactory;
041
042import org.springframework.beans.factory.DisposableBean;
043import org.springframework.beans.factory.InitializingBean;
044import org.springframework.lang.Nullable;
045import org.springframework.util.Assert;
046import org.springframework.util.ClassUtils;
047
048/**
049 * A JMS ConnectionFactory adapter that returns the same Connection
050 * from all {@link #createConnection()} calls, and ignores calls to
051 * {@link javax.jms.Connection#close()}. According to the JMS Connection
052 * model, this is perfectly thread-safe (in contrast to e.g. JDBC). The
053 * shared Connection can be automatically recovered in case of an Exception.
054 *
055 * <p>You can either pass in a specific JMS Connection directly or let this
056 * factory lazily create a Connection via a given target ConnectionFactory.
057 * This factory generally works with JMS 1.1 as well as the JMS 1.0.2 API.
058 *
059 * <p>Note that when using the JMS 1.0.2 API, this ConnectionFactory will switch
060 * into queue/topic mode according to the JMS API methods used at runtime:
061 * {@code createQueueConnection} and {@code createTopicConnection} will
062 * lead to queue/topic mode, respectively; generic {@code createConnection}
063 * calls will lead to a JMS 1.1 connection which is able to serve both modes.
064 *
065 * <p>As of Spring Framework 5, this class supports JMS 2.0 {@code JMSContext}
066 * calls and therefore requires the JMS 2.0 API to be present at runtime.
067 * It may nevertheless run against a JMS 1.1 driver (bound to the JMS 2.0 API)
068 * as long as no actual JMS 2.0 calls are triggered by the application's setup.
069 *
070 * <p>Useful for testing and standalone environments in order to keep using the
071 * same Connection for multiple {@link org.springframework.jms.core.JmsTemplate}
072 * calls, without having a pooling ConnectionFactory underneath. This may span
073 * any number of transactions, even concurrently executing transactions.
074 *
075 * <p>Note that Spring's message listener containers support the use of
076 * a shared Connection within each listener container instance. Using
077 * SingleConnectionFactory in combination only really makes sense for
078 * sharing a single JMS Connection <i>across multiple listener containers</i>.
079 *
080 * @author Juergen Hoeller
081 * @author Mark Pollack
082 * @since 1.1
083 * @see org.springframework.jms.core.JmsTemplate
084 * @see org.springframework.jms.listener.SimpleMessageListenerContainer
085 * @see org.springframework.jms.listener.DefaultMessageListenerContainer#setCacheLevel
086 */
087public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
088                TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {
089
090        protected final Log logger = LogFactory.getLog(getClass());
091
092        @Nullable
093        private ConnectionFactory targetConnectionFactory;
094
095        @Nullable
096        private String clientId;
097
098        @Nullable
099        private ExceptionListener exceptionListener;
100
101        private boolean reconnectOnException = false;
102
103        /** The target Connection. */
104        @Nullable
105        private Connection connection;
106
107        /** A hint whether to create a queue or topic connection. */
108        @Nullable
109        private Boolean pubSubMode;
110
111        /** An internal aggregator allowing for per-connection ExceptionListeners. */
112        @Nullable
113        private AggregatedExceptionListener aggregatedExceptionListener;
114
115        /** Whether the shared Connection has been started. */
116        private int startedCount = 0;
117
118        /** Synchronization monitor for the shared Connection. */
119        private final Object connectionMonitor = new Object();
120
121
122        /**
123         * Create a new SingleConnectionFactory for bean-style usage.
124         * @see #setTargetConnectionFactory
125         */
126        public SingleConnectionFactory() {
127        }
128
129        /**
130         * Create a new SingleConnectionFactory that always returns the given Connection.
131         * @param targetConnection the single Connection
132         */
133        public SingleConnectionFactory(Connection targetConnection) {
134                Assert.notNull(targetConnection, "Target Connection must not be null");
135                this.connection = targetConnection;
136        }
137
138        /**
139         * Create a new SingleConnectionFactory that always returns a single Connection
140         * that it will lazily create via the given target ConnectionFactory.
141         * @param targetConnectionFactory the target ConnectionFactory
142         */
143        public SingleConnectionFactory(ConnectionFactory targetConnectionFactory) {
144                Assert.notNull(targetConnectionFactory, "Target ConnectionFactory must not be null");
145                this.targetConnectionFactory = targetConnectionFactory;
146        }
147
148
149        /**
150         * Set the target ConnectionFactory which will be used to lazily
151         * create a single Connection.
152         */
153        public void setTargetConnectionFactory(@Nullable ConnectionFactory targetConnectionFactory) {
154                this.targetConnectionFactory = targetConnectionFactory;
155        }
156
157        /**
158         * Return the target ConnectionFactory which will be used to lazily
159         * create a single Connection, if any.
160         */
161        @Nullable
162        public ConnectionFactory getTargetConnectionFactory() {
163                return this.targetConnectionFactory;
164        }
165
166        /**
167         * Specify a JMS client ID for the single Connection created and exposed
168         * by this ConnectionFactory.
169         * <p>Note that client IDs need to be unique among all active Connections
170         * of the underlying JMS provider. Furthermore, a client ID can only be
171         * assigned if the original ConnectionFactory hasn't already assigned one.
172         * @see javax.jms.Connection#setClientID
173         * @see #setTargetConnectionFactory
174         */
175        public void setClientId(@Nullable String clientId) {
176                this.clientId = clientId;
177        }
178
179        /**
180         * Return a JMS client ID for the single Connection created and exposed
181         * by this ConnectionFactory, if any.
182         */
183        @Nullable
184        protected String getClientId() {
185                return this.clientId;
186        }
187
188        /**
189         * Specify an JMS ExceptionListener implementation that should be
190         * registered with the single Connection created by this factory.
191         * @see #setReconnectOnException
192         */
193        public void setExceptionListener(@Nullable ExceptionListener exceptionListener) {
194                this.exceptionListener = exceptionListener;
195        }
196
197        /**
198         * Return the JMS ExceptionListener implementation that should be registered
199         * with the single Connection created by this factory, if any.
200         */
201        @Nullable
202        protected ExceptionListener getExceptionListener() {
203                return this.exceptionListener;
204        }
205
206        /**
207         * Specify whether the single Connection should be reset (to be subsequently renewed)
208         * when a JMSException is reported by the underlying Connection.
209         * <p>Default is "false". Switch this to "true" to automatically trigger
210         * recovery based on your JMS provider's exception notifications.
211         * <p>Internally, this will lead to a special JMS ExceptionListener
212         * (this SingleConnectionFactory itself) being registered with the
213         * underlying Connection. This can also be combined with a
214         * user-specified ExceptionListener, if desired.
215         * @see #setExceptionListener
216         */
217        public void setReconnectOnException(boolean reconnectOnException) {
218                this.reconnectOnException = reconnectOnException;
219        }
220
221        /**
222         * Return whether the single Connection should be renewed when
223         * a JMSException is reported by the underlying Connection.
224         */
225        protected boolean isReconnectOnException() {
226                return this.reconnectOnException;
227        }
228
229        /**
230         * Make sure a Connection or ConnectionFactory has been set.
231         */
232        @Override
233        public void afterPropertiesSet() {
234                if (this.connection == null && getTargetConnectionFactory() == null) {
235                        throw new IllegalArgumentException("Target Connection or ConnectionFactory is required");
236                }
237        }
238
239
240        @Override
241        public Connection createConnection() throws JMSException {
242                return getSharedConnectionProxy(getConnection());
243        }
244
245        @Override
246        public Connection createConnection(String username, String password) throws JMSException {
247                throw new javax.jms.IllegalStateException(
248                                "SingleConnectionFactory does not support custom username and password");
249        }
250
251        @Override
252        public QueueConnection createQueueConnection() throws JMSException {
253                Connection con;
254                synchronized (this.connectionMonitor) {
255                        this.pubSubMode = Boolean.FALSE;
256                        con = createConnection();
257                }
258                if (!(con instanceof QueueConnection)) {
259                        throw new javax.jms.IllegalStateException(
260                                        "This SingleConnectionFactory does not hold a QueueConnection but rather: " + con);
261                }
262                return ((QueueConnection) con);
263        }
264
265        @Override
266        public QueueConnection createQueueConnection(String username, String password) throws JMSException {
267                throw new javax.jms.IllegalStateException(
268                                "SingleConnectionFactory does not support custom username and password");
269        }
270
271        @Override
272        public TopicConnection createTopicConnection() throws JMSException {
273                Connection con;
274                synchronized (this.connectionMonitor) {
275                        this.pubSubMode = Boolean.TRUE;
276                        con = createConnection();
277                }
278                if (!(con instanceof TopicConnection)) {
279                        throw new javax.jms.IllegalStateException(
280                                        "This SingleConnectionFactory does not hold a TopicConnection but rather: " + con);
281                }
282                return ((TopicConnection) con);
283        }
284
285        @Override
286        public TopicConnection createTopicConnection(String username, String password) throws JMSException {
287                throw new javax.jms.IllegalStateException(
288                                "SingleConnectionFactory does not support custom username and password");
289        }
290
291        @Override
292        public JMSContext createContext() {
293                return obtainTargetConnectionFactory().createContext();
294        }
295
296        @Override
297        public JMSContext createContext(String userName, String password) {
298                return obtainTargetConnectionFactory().createContext(userName, password);
299        }
300
301        @Override
302        public JMSContext createContext(String userName, String password, int sessionMode) {
303                return obtainTargetConnectionFactory().createContext(userName, password, sessionMode);
304        }
305
306        @Override
307        public JMSContext createContext(int sessionMode) {
308                return obtainTargetConnectionFactory().createContext(sessionMode);
309        }
310
311        private ConnectionFactory obtainTargetConnectionFactory() {
312                ConnectionFactory target = getTargetConnectionFactory();
313                Assert.state(target != null, "'targetConnectionFactory' is required");
314                return target;
315        }
316
317
318        /**
319         * Obtain an initialized shared Connection.
320         * @return the Connection (never {@code null})
321         * @throws javax.jms.JMSException if thrown by JMS API methods
322         * @see #initConnection()
323         */
324        protected Connection getConnection() throws JMSException {
325                synchronized (this.connectionMonitor) {
326                        if (this.connection == null) {
327                                initConnection();
328                        }
329                        return this.connection;
330                }
331        }
332
333        /**
334         * Initialize the underlying shared Connection.
335         * <p>Closes and reinitializes the Connection if an underlying
336         * Connection is present already.
337         * @throws javax.jms.JMSException if thrown by JMS API methods
338         * @see #prepareConnection
339         */
340        public void initConnection() throws JMSException {
341                if (getTargetConnectionFactory() == null) {
342                        throw new IllegalStateException(
343                                        "'targetConnectionFactory' is required for lazily initializing a Connection");
344                }
345                synchronized (this.connectionMonitor) {
346                        if (this.connection != null) {
347                                closeConnection(this.connection);
348                        }
349                        this.connection = doCreateConnection();
350                        prepareConnection(this.connection);
351                        if (this.startedCount > 0) {
352                                this.connection.start();
353                        }
354                        if (logger.isDebugEnabled()) {
355                                logger.debug("Established shared JMS Connection: " + this.connection);
356                        }
357                }
358        }
359
360        /**
361         * Exception listener callback that renews the underlying single Connection.
362         * @see #resetConnection()
363         */
364        @Override
365        public void onException(JMSException ex) {
366                logger.info("Encountered a JMSException - resetting the underlying JMS Connection", ex);
367                resetConnection();
368        }
369
370        /**
371         * Close the underlying shared connection.
372         * The provider of this ConnectionFactory needs to care for proper shutdown.
373         * <p>As this bean implements DisposableBean, a bean factory will
374         * automatically invoke this on destruction of its cached singletons.
375         * @see #resetConnection()
376         */
377        @Override
378        public void destroy() {
379                resetConnection();
380        }
381
382        /**
383         * Reset the underlying shared Connection, to be reinitialized on next access.
384         * @see #closeConnection
385         */
386        public void resetConnection() {
387                synchronized (this.connectionMonitor) {
388                        if (this.connection != null) {
389                                closeConnection(this.connection);
390                        }
391                        this.connection = null;
392                }
393        }
394
395        /**
396         * Create a JMS Connection via this template's ConnectionFactory.
397         * @return the new JMS Connection
398         * @throws javax.jms.JMSException if thrown by JMS API methods
399         */
400        protected Connection doCreateConnection() throws JMSException {
401                ConnectionFactory cf = getTargetConnectionFactory();
402                if (Boolean.FALSE.equals(this.pubSubMode) && cf instanceof QueueConnectionFactory) {
403                        return ((QueueConnectionFactory) cf).createQueueConnection();
404                }
405                else if (Boolean.TRUE.equals(this.pubSubMode) && cf instanceof TopicConnectionFactory) {
406                        return ((TopicConnectionFactory) cf).createTopicConnection();
407                }
408                else {
409                        return obtainTargetConnectionFactory().createConnection();
410                }
411        }
412
413        /**
414         * Prepare the given Connection before it is exposed.
415         * <p>The default implementation applies ExceptionListener and client id.
416         * Can be overridden in subclasses.
417         * @param con the Connection to prepare
418         * @throws JMSException if thrown by JMS API methods
419         * @see #setExceptionListener
420         * @see #setReconnectOnException
421         */
422        protected void prepareConnection(Connection con) throws JMSException {
423                if (getClientId() != null) {
424                        con.setClientID(getClientId());
425                }
426                if (this.aggregatedExceptionListener != null) {
427                        con.setExceptionListener(this.aggregatedExceptionListener);
428                }
429                else if (getExceptionListener() != null || isReconnectOnException()) {
430                        ExceptionListener listenerToUse = getExceptionListener();
431                        if (isReconnectOnException()) {
432                                this.aggregatedExceptionListener = new AggregatedExceptionListener();
433                                this.aggregatedExceptionListener.delegates.add(this);
434                                if (listenerToUse != null) {
435                                        this.aggregatedExceptionListener.delegates.add(listenerToUse);
436                                }
437                                listenerToUse = this.aggregatedExceptionListener;
438                        }
439                        con.setExceptionListener(listenerToUse);
440                }
441        }
442
443        /**
444         * Template method for obtaining a (potentially cached) Session.
445         * <p>The default implementation always returns {@code null}.
446         * Subclasses may override this for exposing specific Session handles,
447         * possibly delegating to {@link #createSession} for the creation of raw
448         * Session objects that will then get wrapped and returned from here.
449         * @param con the JMS Connection to operate on
450         * @param mode the Session acknowledgement mode
451         * ({@code Session.TRANSACTED} or one of the common modes)
452         * @return the Session to use, or {@code null} to indicate
453         * creation of a raw standard Session
454         * @throws JMSException if thrown by the JMS API
455         */
456        @Nullable
457        protected Session getSession(Connection con, Integer mode) throws JMSException {
458                return null;
459        }
460
461        /**
462         * Create a default Session for this ConnectionFactory,
463         * adapting to JMS 1.0.2 style queue/topic mode if necessary.
464         * @param con the JMS Connection to operate on
465         * @param mode the Session acknowledgement mode
466         * ({@code Session.TRANSACTED} or one of the common modes)
467         * @return the newly created Session
468         * @throws JMSException if thrown by the JMS API
469         */
470        protected Session createSession(Connection con, Integer mode) throws JMSException {
471                // Determine JMS API arguments...
472                boolean transacted = (mode == Session.SESSION_TRANSACTED);
473                int ackMode = (transacted ? Session.AUTO_ACKNOWLEDGE : mode);
474                // Now actually call the appropriate JMS factory method...
475                if (Boolean.FALSE.equals(this.pubSubMode) && con instanceof QueueConnection) {
476                        return ((QueueConnection) con).createQueueSession(transacted, ackMode);
477                }
478                else if (Boolean.TRUE.equals(this.pubSubMode) && con instanceof TopicConnection) {
479                        return ((TopicConnection) con).createTopicSession(transacted, ackMode);
480                }
481                else {
482                        return con.createSession(transacted, ackMode);
483                }
484        }
485
486        /**
487         * Close the given Connection.
488         * @param con the Connection to close
489         */
490        protected void closeConnection(Connection con) {
491                if (logger.isDebugEnabled()) {
492                        logger.debug("Closing shared JMS Connection: " + con);
493                }
494                try {
495                        try {
496                                if (this.startedCount > 0) {
497                                        con.stop();
498                                }
499                        }
500                        finally {
501                                con.close();
502                        }
503                }
504                catch (javax.jms.IllegalStateException ex) {
505                        logger.debug("Ignoring Connection state exception - assuming already closed: " + ex);
506                }
507                catch (Throwable ex) {
508                        logger.warn("Could not close shared JMS Connection", ex);
509                }
510        }
511
512        /**
513         * Wrap the given Connection with a proxy that delegates every method call to it
514         * but suppresses close calls. This is useful for allowing application code to
515         * handle a special framework Connection just like an ordinary Connection from a
516         * JMS ConnectionFactory.
517         * @param target the original Connection to wrap
518         * @return the wrapped Connection
519         */
520        protected Connection getSharedConnectionProxy(Connection target) {
521                List<Class<?>> classes = new ArrayList<>(3);
522                classes.add(Connection.class);
523                if (target instanceof QueueConnection) {
524                        classes.add(QueueConnection.class);
525                }
526                if (target instanceof TopicConnection) {
527                        classes.add(TopicConnection.class);
528                }
529                return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(),
530                                ClassUtils.toClassArray(classes), new SharedConnectionInvocationHandler());
531        }
532
533
534        /**
535         * Invocation handler for a cached JMS Connection proxy.
536         */
537        private class SharedConnectionInvocationHandler implements InvocationHandler {
538
539                @Nullable
540                private ExceptionListener localExceptionListener;
541
542                private boolean locallyStarted = false;
543
544                @Override
545                @Nullable
546                public Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
547                        if (method.getName().equals("equals") && args != null) {
548                                Object other = args[0];
549                                if (proxy == other) {
550                                        return true;
551                                }
552                                if (other == null || !Proxy.isProxyClass(other.getClass())) {
553                                        return false;
554                                }
555                                InvocationHandler otherHandler = Proxy.getInvocationHandler(other);
556                                return (otherHandler instanceof SharedConnectionInvocationHandler &&
557                                                factory() == ((SharedConnectionInvocationHandler) otherHandler).factory());
558                        }
559                        else if (method.getName().equals("hashCode")) {
560                                // Use hashCode of containing SingleConnectionFactory.
561                                return System.identityHashCode(factory());
562                        }
563                        else if (method.getName().equals("toString")) {
564                                return "Shared JMS Connection: " + getConnection();
565                        }
566                        else if (method.getName().equals("setClientID") && args != null) {
567                                // Handle setClientID method: throw exception if not compatible.
568                                String currentClientId = getConnection().getClientID();
569                                if (currentClientId != null && currentClientId.equals(args[0])) {
570                                        return null;
571                                }
572                                else {
573                                        throw new javax.jms.IllegalStateException(
574                                                        "setClientID call not supported on proxy for shared Connection. " +
575                                                        "Set the 'clientId' property on the SingleConnectionFactory instead.");
576                                }
577                        }
578                        else if (method.getName().equals("setExceptionListener") && args != null) {
579                                // Handle setExceptionListener method: add to the chain.
580                                synchronized (connectionMonitor) {
581                                        if (aggregatedExceptionListener != null) {
582                                                ExceptionListener listener = (ExceptionListener) args[0];
583                                                if (listener != this.localExceptionListener) {
584                                                        if (this.localExceptionListener != null) {
585                                                                aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
586                                                        }
587                                                        if (listener != null) {
588                                                                aggregatedExceptionListener.delegates.add(listener);
589                                                        }
590                                                        this.localExceptionListener = listener;
591                                                }
592                                                return null;
593                                        }
594                                        else {
595                                                throw new javax.jms.IllegalStateException(
596                                                                "setExceptionListener call not supported on proxy for shared Connection. " +
597                                                                "Set the 'exceptionListener' property on the SingleConnectionFactory instead. " +
598                                                                "Alternatively, activate SingleConnectionFactory's 'reconnectOnException' feature, " +
599                                                                "which will allow for registering further ExceptionListeners to the recovery chain.");
600                                        }
601                                }
602                        }
603                        else if (method.getName().equals("getExceptionListener")) {
604                                synchronized (connectionMonitor) {
605                                        if (this.localExceptionListener != null) {
606                                                return this.localExceptionListener;
607                                        }
608                                        else {
609                                                return getExceptionListener();
610                                        }
611                                }
612                        }
613                        else if (method.getName().equals("start")) {
614                                localStart();
615                                return null;
616                        }
617                        else if (method.getName().equals("stop")) {
618                                localStop();
619                                return null;
620                        }
621                        else if (method.getName().equals("close")) {
622                                localStop();
623                                synchronized (connectionMonitor) {
624                                        if (this.localExceptionListener != null) {
625                                                if (aggregatedExceptionListener != null) {
626                                                        aggregatedExceptionListener.delegates.remove(this.localExceptionListener);
627                                                }
628                                                this.localExceptionListener = null;
629                                        }
630                                }
631                                return null;
632                        }
633                        else if (method.getName().equals("createSession") || method.getName().equals("createQueueSession") ||
634                                        method.getName().equals("createTopicSession")) {
635                                // Default: JMS 2.0 createSession() method
636                                Integer mode = Session.AUTO_ACKNOWLEDGE;
637                                if (args != null) {
638                                        if (args.length == 1) {
639                                                // JMS 2.0 createSession(int) method
640                                                mode = (Integer) args[0];
641                                        }
642                                        else if (args.length == 2) {
643                                                // JMS 1.1 createSession(boolean, int) method
644                                                boolean transacted = (Boolean) args[0];
645                                                Integer ackMode = (Integer) args[1];
646                                                mode = (transacted ? Session.SESSION_TRANSACTED : ackMode);
647                                        }
648                                }
649                                Session session = getSession(getConnection(), mode);
650                                if (session != null) {
651                                        if (!method.getReturnType().isInstance(session)) {
652                                                String msg = "JMS Session does not implement specific domain: " + session;
653                                                try {
654                                                        session.close();
655                                                }
656                                                catch (Throwable ex) {
657                                                        logger.trace("Failed to close newly obtained JMS Session", ex);
658                                                }
659                                                throw new javax.jms.IllegalStateException(msg);
660                                        }
661                                        return session;
662                                }
663                        }
664                        try {
665                                return method.invoke(getConnection(), args);
666                        }
667                        catch (InvocationTargetException ex) {
668                                throw ex.getTargetException();
669                        }
670                }
671
672                private void localStart() throws JMSException {
673                        synchronized (connectionMonitor) {
674                                if (!this.locallyStarted) {
675                                        this.locallyStarted = true;
676                                        if (startedCount == 0 && connection != null) {
677                                                connection.start();
678                                        }
679                                        startedCount++;
680                                }
681                        }
682                }
683
684                private void localStop() throws JMSException {
685                        synchronized (connectionMonitor) {
686                                if (this.locallyStarted) {
687                                        this.locallyStarted = false;
688                                        if (startedCount == 1 && connection != null) {
689                                                connection.stop();
690                                        }
691                                        if (startedCount > 0) {
692                                                startedCount--;
693                                        }
694                                }
695                        }
696                }
697
698                private SingleConnectionFactory factory() {
699                        return SingleConnectionFactory.this;
700                }
701        }
702
703
704        /**
705         * Internal aggregated ExceptionListener for handling the internal
706         * recovery listener in combination with user-specified listeners.
707         */
708        private class AggregatedExceptionListener implements ExceptionListener {
709
710                final Set<ExceptionListener> delegates = new LinkedHashSet<>(2);
711
712                @Override
713                public void onException(JMSException ex) {
714                        // Iterate over temporary copy in order to avoid ConcurrentModificationException,
715                        // since listener invocations may in turn trigger registration of listeners...
716                        Set<ExceptionListener> copy;
717                        synchronized (connectionMonitor) {
718                                copy = new LinkedHashSet<>(this.delegates);
719                        }
720                        for (ExceptionListener listener : copy) {
721                                listener.onException(ex);
722                        }
723                }
724        }
725
726}