001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import javax.jms.Connection;
020import javax.jms.ConnectionFactory;
021import javax.jms.JMSException;
022import javax.jms.QueueConnection;
023import javax.jms.QueueConnectionFactory;
024import javax.jms.QueueSession;
025import javax.jms.Session;
026import javax.jms.TopicConnection;
027import javax.jms.TopicConnectionFactory;
028import javax.jms.TopicSession;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.springframework.lang.Nullable;
034import org.springframework.transaction.support.ResourceHolderSynchronization;
035import org.springframework.transaction.support.TransactionSynchronizationManager;
036import org.springframework.util.Assert;
037
038/**
039 * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular
040 * for obtaining transactional JMS resources for a given ConnectionFactory.
041 *
042 * <p>Mainly for internal use within the framework. Used by
043 * {@link org.springframework.jms.core.JmsTemplate} as well as
044 * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
045 *
046 * @author Juergen Hoeller
047 * @since 2.0
048 * @see SmartConnectionFactory
049 */
050public abstract class ConnectionFactoryUtils {
051
052        private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class);
053
054
055        /**
056         * Release the given Connection, stopping it (if necessary) and eventually closing it.
057         * <p>Checks {@link SmartConnectionFactory#shouldStop}, if available.
058         * This is essentially a more sophisticated version of
059         * {@link org.springframework.jms.support.JmsUtils#closeConnection}.
060         * @param con the Connection to release
061         * (if this is {@code null}, the call will be ignored)
062         * @param cf the ConnectionFactory that the Connection was obtained from
063         * (may be {@code null})
064         * @param started whether the Connection might have been started by the application
065         * @see SmartConnectionFactory#shouldStop
066         * @see org.springframework.jms.support.JmsUtils#closeConnection
067         */
068        public static void releaseConnection(@Nullable Connection con, @Nullable ConnectionFactory cf, boolean started) {
069                if (con == null) {
070                        return;
071                }
072                if (started && cf instanceof SmartConnectionFactory && ((SmartConnectionFactory) cf).shouldStop(con)) {
073                        try {
074                                con.stop();
075                        }
076                        catch (Throwable ex) {
077                                logger.debug("Could not stop JMS Connection before closing it", ex);
078                        }
079                }
080                try {
081                        con.close();
082                }
083                catch (Throwable ex) {
084                        logger.debug("Could not close JMS Connection", ex);
085                }
086        }
087
088        /**
089         * Return the innermost target Session of the given Session. If the given
090         * Session is a proxy, it will be unwrapped until a non-proxy Session is
091         * found. Otherwise, the passed-in Session will be returned as-is.
092         * @param session the Session proxy to unwrap
093         * @return the innermost target Session, or the passed-in one if no proxy
094         * @see SessionProxy#getTargetSession()
095         */
096        public static Session getTargetSession(Session session) {
097                Session sessionToUse = session;
098                while (sessionToUse instanceof SessionProxy) {
099                        sessionToUse = ((SessionProxy) sessionToUse).getTargetSession();
100                }
101                return sessionToUse;
102        }
103
104
105
106        /**
107         * Determine whether the given JMS Session is transactional, that is,
108         * bound to the current thread by Spring's transaction facilities.
109         * @param session the JMS Session to check
110         * @param cf the JMS ConnectionFactory that the Session originated from
111         * @return whether the Session is transactional
112         */
113        public static boolean isSessionTransactional(@Nullable Session session, @Nullable ConnectionFactory cf) {
114                if (session == null || cf == null) {
115                        return false;
116                }
117                JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(cf);
118                return (resourceHolder != null && resourceHolder.containsSession(session));
119        }
120
121
122        /**
123         * Obtain a JMS Session that is synchronized with the current transaction, if any.
124         * @param cf the ConnectionFactory to obtain a Session for
125         * @param existingCon the existing JMS Connection to obtain a Session for
126         * (may be {@code null})
127         * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
128         * that is synchronized with a Spring-managed transaction (where the main transaction
129         * might be a JDBC-based one for a specific DataSource, for example), with the JMS
130         * transaction committing right after the main transaction. If not allowed, the given
131         * ConnectionFactory needs to handle transaction enlistment underneath the covers.
132         * @return the transactional Session, or {@code null} if none found
133         * @throws JMSException in case of JMS failure
134         */
135        @Nullable
136        public static Session getTransactionalSession(final ConnectionFactory cf,
137                        @Nullable final Connection existingCon, final boolean synchedLocalTransactionAllowed)
138                        throws JMSException {
139
140                return doGetTransactionalSession(cf, new ResourceFactory() {
141                        @Override
142                        @Nullable
143                        public Session getSession(JmsResourceHolder holder) {
144                                return holder.getSession(Session.class, existingCon);
145                        }
146                        @Override
147                        @Nullable
148                        public Connection getConnection(JmsResourceHolder holder) {
149                                return (existingCon != null ? existingCon : holder.getConnection());
150                        }
151                        @Override
152                        public Connection createConnection() throws JMSException {
153                                return cf.createConnection();
154                        }
155                        @Override
156                        public Session createSession(Connection con) throws JMSException {
157                                return con.createSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
158                        }
159                        @Override
160                        public boolean isSynchedLocalTransactionAllowed() {
161                                return synchedLocalTransactionAllowed;
162                        }
163                }, true);
164        }
165
166        /**
167         * Obtain a JMS QueueSession that is synchronized with the current transaction, if any.
168         * <p>Mainly intended for use with the JMS 1.0.2 API.
169         * @param cf the ConnectionFactory to obtain a Session for
170         * @param existingCon the existing JMS Connection to obtain a Session for
171         * (may be {@code null})
172         * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
173         * that is synchronized with a Spring-managed transaction (where the main transaction
174         * might be a JDBC-based one for a specific DataSource, for example), with the JMS
175         * transaction committing right after the main transaction. If not allowed, the given
176         * ConnectionFactory needs to handle transaction enlistment underneath the covers.
177         * @return the transactional Session, or {@code null} if none found
178         * @throws JMSException in case of JMS failure
179         */
180        @Nullable
181        public static QueueSession getTransactionalQueueSession(final QueueConnectionFactory cf,
182                        @Nullable final QueueConnection existingCon, final boolean synchedLocalTransactionAllowed)
183                        throws JMSException {
184
185                return (QueueSession) doGetTransactionalSession(cf, new ResourceFactory() {
186                        @Override
187                        @Nullable
188                        public Session getSession(JmsResourceHolder holder) {
189                                return holder.getSession(QueueSession.class, existingCon);
190                        }
191                        @Override
192                        @Nullable
193                        public Connection getConnection(JmsResourceHolder holder) {
194                                return (existingCon != null ? existingCon : holder.getConnection(QueueConnection.class));
195                        }
196                        @Override
197                        public Connection createConnection() throws JMSException {
198                                return cf.createQueueConnection();
199                        }
200                        @Override
201                        public Session createSession(Connection con) throws JMSException {
202                                return ((QueueConnection) con).createQueueSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
203                        }
204                        @Override
205                        public boolean isSynchedLocalTransactionAllowed() {
206                                return synchedLocalTransactionAllowed;
207                        }
208                }, true);
209        }
210
211        /**
212         * Obtain a JMS TopicSession that is synchronized with the current transaction, if any.
213         * <p>Mainly intended for use with the JMS 1.0.2 API.
214         * @param cf the ConnectionFactory to obtain a Session for
215         * @param existingCon the existing JMS Connection to obtain a Session for
216         * (may be {@code null})
217         * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
218         * that is synchronized with a Spring-managed transaction (where the main transaction
219         * might be a JDBC-based one for a specific DataSource, for example), with the JMS
220         * transaction committing right after the main transaction. If not allowed, the given
221         * ConnectionFactory needs to handle transaction enlistment underneath the covers.
222         * @return the transactional Session, or {@code null} if none found
223         * @throws JMSException in case of JMS failure
224         */
225        @Nullable
226        public static TopicSession getTransactionalTopicSession(final TopicConnectionFactory cf,
227                        @Nullable final TopicConnection existingCon, final boolean synchedLocalTransactionAllowed)
228                        throws JMSException {
229
230                return (TopicSession) doGetTransactionalSession(cf, new ResourceFactory() {
231                        @Override
232                        @Nullable
233                        public Session getSession(JmsResourceHolder holder) {
234                                return holder.getSession(TopicSession.class, existingCon);
235                        }
236                        @Override
237                        @Nullable
238                        public Connection getConnection(JmsResourceHolder holder) {
239                                return (existingCon != null ? existingCon : holder.getConnection(TopicConnection.class));
240                        }
241                        @Override
242                        public Connection createConnection() throws JMSException {
243                                return cf.createTopicConnection();
244                        }
245                        @Override
246                        public Session createSession(Connection con) throws JMSException {
247                                return ((TopicConnection) con).createTopicSession(
248                                                synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
249                        }
250                        @Override
251                        public boolean isSynchedLocalTransactionAllowed() {
252                                return synchedLocalTransactionAllowed;
253                        }
254                }, true);
255        }
256
257        /**
258         * Obtain a JMS Session that is synchronized with the current transaction, if any.
259         * <p>This {@code doGetTransactionalSession} variant always starts the underlying
260         * JMS Connection, assuming that the Session will be used for receiving messages.
261         * @param connectionFactory the JMS ConnectionFactory to bind for
262         * (used as TransactionSynchronizationManager key)
263         * @param resourceFactory the ResourceFactory to use for extracting or creating
264         * JMS resources
265         * @return the transactional Session, or {@code null} if none found
266         * @throws JMSException in case of JMS failure
267         * @see #doGetTransactionalSession(javax.jms.ConnectionFactory, ResourceFactory, boolean)
268         */
269        @Nullable
270        public static Session doGetTransactionalSession(
271                        ConnectionFactory connectionFactory, ResourceFactory resourceFactory) throws JMSException {
272
273                return doGetTransactionalSession(connectionFactory, resourceFactory, true);
274        }
275
276        /**
277         * Obtain a JMS Session that is synchronized with the current transaction, if any.
278         * @param connectionFactory the JMS ConnectionFactory to bind for
279         * (used as TransactionSynchronizationManager key)
280         * @param resourceFactory the ResourceFactory to use for extracting or creating
281         * JMS resources
282         * @param startConnection whether the underlying JMS Connection approach should be
283         * started in order to allow for receiving messages. Note that a reused Connection
284         * may already have been started before, even if this flag is {@code false}.
285         * @return the transactional Session, or {@code null} if none found
286         * @throws JMSException in case of JMS failure
287         */
288        @Nullable
289        public static Session doGetTpan class="sourceLineNo">290                        ConnectionFactory connectionFactory, ResourceFactory resourceFactory, boolean startConnection)
291                        throws JMSException {
292
293                Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
294                Assert.notNull(resourceFactory, "ResourceFactory must not be null");
295
296                JmsResourceHolder resourceHolder =
297                                (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
298                if (resourceHolder != null) {
299                        Session session = resourceFactory.getSession(resourceHolder);
300                        if (session != null) {
301                                if (startConnection) {
302                                        Connection con = resourceFactory.getConnection(resourceHolder);
303                                        if (con != null) {
304                                                con.start();
305                                        }
306                                }
307                                return session;
308                        }
309                        if (resourceHolder.isFrozen()) {
310                                return null;
311                        }
312                }
313                if (!TransactionSynchronizationManager.isSynchronizationActive()) {
314                        return null;
315                }
316                JmsResourceHolder resourceHolderToUse = resourceHolder;
317                if (resourceHolderToUse == null) {
318                        resourceHolderToUse = new JmsResourceHolder(connectionFactory);
319                }
320                Connection con = resourceFactory.getConnection(resourceHolderToUse);
321                Session session = null;
322                try {
323                        boolean isExistingCon = (con != null);
324                        if (!isExistingCon) {
325                                con = resourceFactory.createConnection();
326                                resourceHolderToUse.addConnection(con);
327                        }
328                        session = resourceFactory.createSession(con);
329                        resourceHolderToUse.addSession(session, con);
330                        if (startConnection) {
331                                con.start();
332                        }
333                }
334                catch (JMSException ex) {
335                        if (session != null) {
336                                try {
337                                        session.close();
338                                }
339                                catch (Throwable ex2) {
340                                        // ignore
341                                }
342                        }
343                        if (con != null) {
344                                try {
345                                        con.close();
346                                }
347                                catch (Throwable ex2) {
348                                        // ignore
349                                }
350                        }
351                        throw ex;
352                }
353                if (resourceHolderToUse != resourceHolder) {
354                        TransactionSynchronizationManager.registerSynchronization(
355                                        new JmsResourceSynchronization(resourceHolderToUse, connectionFactory,
356                                                        resourceFactory.isSynchedLocalTransactionAllowed()));
357                        resourceHolderToUse.setSynchronizedWithTransaction(true);
358                        TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse);
359                }
360                return session;
361        }
362
363
364        /**
365         * Callback interface for resource creation.
366         * Serving as argument for the {@code doGetTransactionalSession} method.
367         */
368        public interface ResourceFactory {
369
370                /**
371                 * Fetch an appropriate Session from the given JmsResourceHolder.
372                 * @param holder the JmsResourceHolder
373                 * @return an appropriate Session fetched from the holder,
374                 * or {@code null} if none found
375                 */
376                @Nullable
377                Session getSession(JmsResourceHolder holder);
378
379                /**
380                 * Fetch an appropriate Connection from the given JmsResourceHolder.
381                 * @param holder the JmsResourceHolder
382                 * @return an appropriate Connection fetched from the holder,
383                 * or {@code null} if none found
384                 */
385                @Nullable
386                Connection getConnection(JmsResourceHolder holder);
387
388                /**
389                 * Create a new JMS Connection for registration with a JmsResourceHolder.
390                 * @return the new JMS Connection
391                 * @throws JMSException if thrown by JMS API methods
392                 */
393                Connection createConnection() throws JMSException;
394
395                /**
396                 * Create a new JMS Session for registration with a JmsResourceHolder.
397                 * @param con the JMS Connection to create a Session for
398                 * @return the new JMS Session
399                 * @throws JMSException if thrown by JMS API methods
400                 */
401                Session createSession(Connection con) throws JMSException;
402
403                /**
404                 * Return whether to allow for a local JMS transaction that is synchronized with
405                 * a Spring-managed transaction (where the main transaction might be a JDBC-based
406                 * one for a specific DataSource, for example), with the JMS transaction
407                 * committing right after the main transaction.
408                 * @return whether to allow for synchronizing a local JMS transaction
409                 */
410                boolean isSynchedLocalTransactionAllowed();
411        }
412
413
414        /**
415         * Callback for resource cleanup at the end of a non-native JMS transaction
416         * (e.g. when participating in a JtaTransactionManager transaction).
417         * @see org.springframework.transaction.jta.JtaTransactionManager
418         */
419        private static class JmsResourceSynchronization extends ResourceHolderSynchronization<JmsResourceHolder, Object> {
420
421                private final boolean transacted;
422
423                public JmsResourceSynchronization(JmsResourceHolder resourceHolder, Object resourceKey, boolean transacted) {
424                        super(resourceHolder, resourceKey);
425                        this.transacted = transacted;
426                }
427
428                @Override
429                protected boolean shouldReleaseBeforeCompletion() {
430                        return !this.transacted;
431                }
432
433                @Override
434                protected void processResourceAfterCommit(JmsResourceHolder resourceHolder) {
435                        try {
436                                resourceHolder.commitAll();
437                        }
438                        catch (JMSException ex) {
439                                throw new SynchedLocalTransactionFailedException("Local JMS transaction failed to commit", ex);
440                        }
441                }
442
443                @Override
444                protected void releaseResource(JmsResourceHolder resourceHolder, Object resourceKey) {
445                        resourceHolder.closeAll();
446                }
447        }
448
449}