001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import javax.jms.Connection;
020import javax.jms.ConnectionFactory;
021import javax.jms.JMSException;
022import javax.jms.QueueConnection;
023import javax.jms.QueueConnectionFactory;
024import javax.jms.QueueSession;
025import javax.jms.Session;
026import javax.jms.TopicConnection;
027import javax.jms.TopicConnectionFactory;
028import javax.jms.TopicSession;
029
030import org.apache.commons.logging.Log;
031import org.apache.commons.logging.LogFactory;
032
033import org.springframework.transaction.support.ResourceHolderSynchronization;
034import org.springframework.transaction.support.TransactionSynchronizationManager;
035import org.springframework.util.Assert;
036
037/**
038 * Helper class for managing a JMS {@link javax.jms.ConnectionFactory}, in particular
039 * for obtaining transactional JMS resources for a given ConnectionFactory.
040 *
041 * <p>Mainly for internal use within the framework. Used by
042 * {@link org.springframework.jms.core.JmsTemplate} as well as
043 * {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
044 *
045 * @author Juergen Hoeller
046 * @since 2.0
047 * @see SmartConnectionFactory
048 */
049public abstract class ConnectionFactoryUtils {
050
051        private static final Log logger = LogFactory.getLog(ConnectionFactoryUtils.class);
052
053
054        /**
055         * Release the given Connection, stopping it (if necessary) and eventually closing it.
056         * <p>Checks {@link SmartConnectionFactory#shouldStop}, if available.
057         * This is essentially a more sophisticated version of
058         * {@link org.springframework.jms.support.JmsUtils#closeConnection}.
059         * @param con the Connection to release
060         * (if this is {@code null}, the call will be ignored)
061         * @param cf the ConnectionFactory that the Connection was obtained from
062         * (may be {@code null})
063         * @param started whether the Connection might have been started by the application
064         * @see SmartConnectionFactory#shouldStop
065         * @see org.springframework.jms.support.JmsUtils#closeConnection
066         */
067        public static void releaseConnection(Connection con, ConnectionFactory cf, boolean started) {
068                if (con == null) {
069                        return;
070                }
071                if (started && cf instanceof SmartConnectionFactory && ((SmartConnectionFactory) cf).shouldStop(con)) {
072                        try {
073                                con.stop();
074                        }
075                        catch (Throwable ex) {
076                                logger.debug("Could not stop JMS Connection before closing it", ex);
077                        }
078                }
079                try {
080                        con.close();
081                }
082                catch (Throwable ex) {
083                        logger.debug("Could not close JMS Connection", ex);
084                }
085        }
086
087        /**
088         * Return the innermost target Session of the given Session. If the given
089         * Session is a proxy, it will be unwrapped until a non-proxy Session is
090         * found. Otherwise, the passed-in Session will be returned as-is.
091         * @param session the Session proxy to unwrap
092         * @return the innermost target Session, or the passed-in one if no proxy
093         * @see SessionProxy#getTargetSession()
094         */
095        public static Session getTargetSession(Session session) {
096                Session sessionToUse = session;
097                while (sessionToUse instanceof SessionProxy) {
098                        sessionToUse = ((SessionProxy) sessionToUse).getTargetSession();
099                }
100                return sessionToUse;
101        }
102
103
104
105        /**
106         * Determine whether the given JMS Session is transactional, that is,
107         * bound to the current thread by Spring's transaction facilities.
108         * @param session the JMS Session to check
109         * @param cf the JMS ConnectionFactory that the Session originated from
110         * @return whether the Session is transactional
111         */
112        public static boolean isSessionTransactional(Session session, ConnectionFactory cf) {
113                if (session == null || cf == null) {
114                        return false;
115                }
116                JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(cf);
117                return (resourceHolder != null && resourceHolder.containsSession(session));
118        }
119
120
121        /**
122         * Obtain a JMS Session that is synchronized with the current transaction, if any.
123         * @param cf the ConnectionFactory to obtain a Session for
124         * @param existingCon the existing JMS Connection to obtain a Session for
125         * (may be {@code null})
126         * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
127         * that is synchronized with a Spring-managed transaction (where the main transaction
128         * might be a JDBC-based one for a specific DataSource, for example), with the JMS
129         * transaction committing right after the main transaction. If not allowed, the given
130         * ConnectionFactory needs to handle transaction enlistment underneath the covers.
131         * @return the transactional Session, or {@code null} if none found
132         * @throws JMSException in case of JMS failure
133         */
134        public static Session getTransactionalSession(final ConnectionFactory cf,
135                        final Connection existingCon, final boolean synchedLocalTransactionAllowed)
136                        throws JMSException {
137
138                return doGetTransactionalSession(cf, new ResourceFactory() {
139                        @Override
140                        public Session getSession(JmsResourceHolder holder) {
141                                return holder.getSession(Session.class, existingCon);
142                        }
143                        @Override
144                        public Connection getConnection(JmsResourceHolder holder) {
145                                return (existingCon != null ? existingCon : holder.getConnection());
146                        }
147                        @Override
148                        public Connection createConnection() throws JMSException {
149                                return cf.createConnection();
150                        }
151                        @Override
152                        public Session createSession(Connection con) throws JMSException {
153                                return con.createSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
154                        }
155                        @Override
156                        public boolean isSynchedLocalTransactionAllowed() {
157                                return synchedLocalTransactionAllowed;
158                        }
159                }, true);
160        }
161
162        /**
163         * Obtain a JMS QueueSession that is synchronized with the current transaction, if any.
164         * <p>Mainly intended for use with the JMS 1.0.2 API.
165         * @param cf the ConnectionFactory to obtain a Session for
166         * @param existingCon the existing JMS Connection to obtain a Session for
167         * (may be {@code null})
168         * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
169         * that is synchronized with a Spring-managed transaction (where the main transaction
170         * might be a JDBC-based one for a specific DataSource, for example), with the JMS
171         * transaction committing right after the main transaction. If not allowed, the given
172         * ConnectionFactory needs to handle transaction enlistment underneath the covers.
173         * @return the transactional Session, or {@code null} if none found
174         * @throws JMSException in case of JMS failure
175         */
176        public static QueueSession getTransactionalQueueSession(final QueueConnectionFactory cf,
177                        final QueueConnection existingCon, final boolean synchedLocalTransactionAllowed)
178                        throws JMSException {
179
180                return (QueueSession) doGetTransactionalSession(cf, new ResourceFactory() {
181                        @Override
182                        public Session getSession(JmsResourceHolder holder) {
183                                return holder.getSession(QueueSession.class, existingCon);
184                        }
185                        @Override
186                        public Connection getConnection(JmsResourceHolder holder) {
187                                return (existingCon != null ? existingCon : holder.getConnection(QueueConnection.class));
188                        }
189                        @Override
190                        public Connection createConnection() throws JMSException {
191                                return cf.createQueueConnection();
192                        }
193                        @Override
194                        public Session createSession(Connection con) throws JMSException {
195                                return ((QueueConnection) con).createQueueSession(synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
196                        }
197                        @Override
198                        public boolean isSynchedLocalTransactionAllowed() {
199                                return synchedLocalTransactionAllowed;
200                        }
201                }, true);
202        }
203
204        /**
205         * Obtain a JMS TopicSession that is synchronized with the current transaction, if any.
206         * <p>Mainly intended for use with the JMS 1.0.2 API.
207         * @param cf the ConnectionFactory to obtain a Session for
208         * @param existingCon the existing JMS Connection to obtain a Session for
209         * (may be {@code null})
210         * @param synchedLocalTransactionAllowed whether to allow for a local JMS transaction
211         * that is synchronized with a Spring-managed transaction (where the main transaction
212         * might be a JDBC-based one for a specific DataSource, for example), with the JMS
213         * transaction committing right after the main transaction. If not allowed, the given
214         * ConnectionFactory needs to handle transaction enlistment underneath the covers.
215         * @return the transactional Session, or {@code null} if none found
216         * @throws JMSException in case of JMS failure
217         */
218        public static TopicSession getTransactionalTopicSession(final TopicConnectionFactory cf,
219                        final TopicConnection existingCon, final boolean synchedLocalTransactionAllowed)
220                        throws JMSException {
221
222                return (TopicSession) doGetTransactionalSession(cf, new ResourceFactory() {
223                        @Override
224                        public Session getSession(JmsResourceHolder holder) {
225                                return holder.getSession(TopicSession.class, existingCon);
226                        }
227                        @Override
228                        public Connection getConnection(JmsResourceHolder holder) {
229                                return (existingCon != null ? existingCon : holder.getConnection(TopicConnection.class));
230                        }
231                        @Override
232                        public Connection createConnection() throws JMSException {
233                                return cf.createTopicConnection();
234                        }
235                        @Override
236                        public Session createSession(Connection con) throws JMSException {
237                                return ((TopicConnection) con).createTopicSession(
238                                                synchedLocalTransactionAllowed, Session.AUTO_ACKNOWLEDGE);
239                        }
240                        @Override
241                        public boolean isSynchedLocalTransactionAllowed() {
242                                return synchedLocalTransactionAllowed;
243                        }
244                }, true);
245        }
246
247        /**
248         * Obtain a JMS Session that is synchronized with the current transaction, if any.
249         * <p>This {@code doGetTransactionalSession} variant always starts the underlying
250         * JMS Connection, assuming that the Session will be used for receiving messages.
251         * @param connectionFactory the JMS ConnectionFactory to bind for
252         * (used as TransactionSynchronizationManager key)
253         * @param resourceFactory the ResourceFactory to use for extracting or creating
254         * JMS resources
255         * @return the transactional Session, or {@code null} if none found
256         * @throws JMSException in case of JMS failure
257         * @see #doGetTransactionalSession(javax.jms.ConnectionFactory, ResourceFactory, boolean)
258         */
259        public static Session doGetTransactionalSession(
260                        ConnectionFactory connectionFactory, ResourceFactory resourceFactory) throws JMSException {
261
262                return doGetTransactionalSession(connectionFactory, resourceFactory, true);
263        }
264
265        /**
266         * Obtain a JMS Session that is synchronized with the current transaction, if any.
267         * @param connectionFactory the JMS ConnectionFactory to bind for
268         * (used as TransactionSynchronizationManager key)
269         * @param resourceFactory the ResourceFactory to use for extracting or creating
270         * JMS resources
271         * @param startConnection whether the underlying JMS Connection approach should be
272         * started in order to allow for receiving messages. Note that a reused Connection
273         * may already have been started before, even if this flag is {@code false}.
274         * @return the transactional Session, or {@code null} if none found
275         * @throws JMSException in case of JMS failure
276         */
277        public static Session doGetTransactionalSession(
278                        ConnectionFactory connectionFactory, ResourceFactory resourceFactory, boolean startConnection)
279                        throws JMSException {
280
281                Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
282                Assert.notNull(resourceFactory, "ResourceFactory must not be null");
283
284                JmsResourceHolder resourceHolder =
285                                (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
286                if (resourceHolder != null) {
287                        Session session = resourceFactory.getSession(resourceHolder);
288                        if (session != null) {
289                                if (startConnection) {
290                                        Connection con = resourceFactory.getConnection(resourceHolder);
291                                        if (con != null) {
292                                                con.start();
293                                        }
294                                }
295                                return session;
296                        }
297                        if (resourceHolder.isFrozen()) {
298                                return null;
299                        }
300                }
301                if (!TransactionSynchronizationManager.isSynchronizationActive()) {
302                        return null;
303                }
304                JmsResourceHolder resourceHolderToUse = resourceHolder;
305                if (resourceHolderToUse == null) {
306                        resourceHolderToUse = new JmsResourceHolder(connectionFactory);
307                }
308                Connection con = resourceFactory.getConnection(resourceHolderToUse);
309                Session session = null;
310                try {
311                        boolean isExistingCon = (con != null);
312                        if (!isExistingCon) {
313                                con = resourceFactory.createConnection();
314                                resourceHolderToUse.addConnection(con);
315                        }
316                        session = resourceFactory.createSession(con);
317                        resourceHolderToUse.addSession(session, con);
318                        if (startConnection) {
319                                con.start();
320                        }
321                }
322                catch (JMSException ex) {
323                        if (session != null) {
324                                try {
325                                        session.close();
326                                }
327                                catch (Throwable ex2) {
328                                        // ignore
329                                }
330                        }
331                        if (con != null) {
332                                try {
333                                        con.close();
334                                }
335                                catch (Throwable ex2) {
336                                        // ignore
337                                }
338                        }
339                        throw ex;
340                }
341                if (resourceHolderToUse != resourceHolder) {
342                        TransactionSynchronizationManager.registerSynchronization(
343                                        new JmsResourceSynchronization(resourceHolderToUse, connectionFactory,
344                                                        resourceFactory.isSynchedLocalTransactionAllowed()));
345                        resourceHolderToUse.setSynchronizedWithTransaction(true);
346                        TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolderToUse);
347                }
348                return session;
349        }
350
351
352        /**
353         * Callback interface for resource creation.
354         * Serving as argument for the {@code doGetTransactionalSession} method.
355         */
356        public interface ResourceFactory {
357
358                /**
359                 * Fetch an appropriate Session from the given JmsResourceHolder.
360                 * @param holder the JmsResourceHolder
361                 * @return an appropriate Session fetched from the holder,
362                 * or {@code null} if none found
363                 */
364                Session getSession(JmsResourceHolder holder);
365
366                /**
367                 * Fetch an appropriate Connection from the given JmsResourceHolder.
368                 * @param holder the JmsResourceHolder
369                 * @return an appropriate Connection fetched from the holder,
370                 * or {@code null} if none found
371                 */
372                Connection getConnection(JmsResourceHolder holder);
373
374                /**
375                 * Create a new JMS Connection for registration with a JmsResourceHolder.
376                 * @return the new JMS Connection
377                 * @throws JMSException if thrown by JMS API methods
378                 */
379                Connection createConnection() throws JMSException;
380
381                /**
382                 * Create a new JMS Session for registration with a JmsResourceHolder.
383                 * @param con the JMS Connection to create a Session for
384                 * @return the new JMS Session
385                 * @throws JMSException if thrown by JMS API methods
386                 */
387                Session createSession(Connection con) throws JMSException;
388
389                /**
390                 * Return whether to allow for a local JMS transaction that is synchronized with
391                 * a Spring-managed transaction (where the main transaction might be a JDBC-based
392                 * one for a specific DataSource, for example), with the JMS transaction
393                 * committing right after the main transaction.
394                 * @return whether to allow for synchronizing a local JMS transaction
395                 */
396                boolean isSynchedLocalTransactionAllowed();
397        }
398
399
400        /**
401         * Callback for resource cleanup at the end of a non-native JMS transaction
402         * (e.g. when participating in a JtaTransactionManager transaction).
403         * @see org.springframework.transaction.jta.JtaTransactionManager
404         */
405        private static class JmsResourceSynchronization extends ResourceHolderSynchronization<JmsResourceHolder, Object> {
406
407                private final boolean transacted;
408
409                public JmsResourceSynchronization(JmsResourceHolder resourceHolder, Object resourceKey, boolean transacted) {
410                        super(resourceHolder, resourceKey);
411                        this.transacted = transacted;
412                }
413
414                @Override
415                protected boolean shouldReleaseBeforeCompletion() {
416                        return !this.transacted;
417                }
418
419                @Override
420                protected void processResourceAfterCommit(JmsResourceHolder resourceHolder) {
421                        try {
422                                resourceHolder.commitAll();
423                        }
424                        catch (JMSException ex) {
425                                throw new SynchedLocalTransactionFailedException("Local JMS transaction failed to commit", ex);
426                        }
427                }
428
429                @Override
430                protected void releaseResource(JmsResourceHolder resourceHolder, Object resourceKey) {
431                        resourceHolder.closeAll();
432                }
433        }
434
435}