001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.connection;
018
019import javax.jms.Connection;
020import javax.jms.ConnectionFactory;
021import javax.jms.JMSException;
022import javax.jms.Session;
023import javax.jms.TransactionRolledBackException;
024
025import org.springframework.beans.factory.InitializingBean;
026import org.springframework.lang.Nullable;
027import org.springframework.transaction.CannotCreateTransactionException;
028import org.springframework.transaction.InvalidIsolationLevelException;
029import org.springframework.transaction.TransactionDefinition;
030import org.springframework.transaction.TransactionSystemException;
031import org.springframework.transaction.UnexpectedRollbackException;
032import org.springframework.transaction.support.AbstractPlatformTransactionManager;
033import org.springframework.transaction.support.DefaultTransactionStatus;
034import org.springframework.transaction.support.ResourceTransactionManager;
035import org.springframework.transaction.support.SmartTransactionObject;
036import org.springframework.transaction.support.TransactionSynchronizationManager;
037import org.springframework.util.Assert;
038
039/**
040 * {@link org.springframework.transaction.PlatformTransactionManager} implementation
041 * for a single JMS {@link javax.jms.ConnectionFactory}. Binds a JMS
042 * Connection/Session pair from the specified ConnectionFactory to the thread,
043 * potentially allowing for one thread-bound Session per ConnectionFactory.
044 *
045 * <p>This local strategy is an alternative to executing JMS operations within
046 * JTA transactions. Its advantage is that it is able to work in any environment,
047 * for example a standalone application or a test suite, with any message broker
048 * as target. However, this strategy is <i>not</i> able to provide XA transactions,
049 * for example in order to share transactions between messaging and database access.
050 * A full JTA/XA setup is required for XA transactions, typically using Spring's
051 * {@link org.springframework.transaction.jta.JtaTransactionManager} as strategy.
052 *
053 * <p>Application code is required to retrieve the transactional JMS Session via
054 * {@link ConnectionFactoryUtils#getTransactionalSession} instead of a standard
055 * Java EE-style {@link ConnectionFactory#createConnection()} call with subsequent
056 * Session creation. Spring's {@link org.springframework.jms.core.JmsTemplate}
057 * will autodetect a thread-bound Session and automatically participate in it.
058 *
059 * <p>Alternatively, you can allow application code to work with the standard
060 * Java EE-style lookup pattern on a ConnectionFactory, for example for legacy code
061 * that is not aware of Spring at all. In that case, define a
062 * {@link TransactionAwareConnectionFactoryProxy} for your target ConnectionFactory,
063 * which will automatically participate in Spring-managed transactions.
064 *
065 * <p><b>The use of {@link CachingConnectionFactory} as a target for this
066 * transaction manager is strongly recommended.</b> CachingConnectionFactory
067 * uses a single JMS Connection for all JMS access in order to avoid the overhead
068 * of repeated Connection creation, as well as maintaining a cache of Sessions.
069 * Each transaction will then share the same JMS Connection, while still using
070 * its own individual JMS Session.
071 *
072 * <p>The use of a <i>raw</i> target ConnectionFactory would not only be inefficient
073 * because of the lack of resource reuse. It might also lead to strange effects
074 * when your JMS driver doesn't accept {@code MessageProducer.close()} calls
075 * and/or {@code MessageConsumer.close()} calls before {@code Session.commit()},
076 * with the latter supposed to commit all the messages that have been sent through the
077 * producer handle and received through the consumer handle. As a safe general solution,
078 * always pass in a {@link CachingConnectionFactory} into this transaction manager's
079 * {@link #setConnectionFactory "connectionFactory"} property.
080 *
081 * <p>Transaction synchronization is turned off by default, as this manager might
082 * be used alongside a datastore-based Spring transaction manager such as the
083 * JDBC {@link org.springframework.jdbc.datasource.DataSourceTransactionManager},
084 * which has stronger needs for synchronization.
085 *
086 * @author Juergen Hoeller
087 * @since 1.1
088 * @see ConnectionFactoryUtils#getTransactionalSession
089 * @see TransactionAwareConnectionFactoryProxy
090 * @see org.springframework.jms.core.JmsTemplate
091 */
092@SuppressWarnings("serial")
093public class JmsTransactionManager extends AbstractPlatformTransactionManager
094                implements ResourceTransactionManager, InitializingBean {
095
096        @Nullable
097        private ConnectionFactory connectionFactory;
098
099        private boolean lazyResourceRetrieval = false;
100
101
102        /**
103         * Create a new JmsTransactionManager for bean-style usage.
104         * <p>Note: The ConnectionFactory has to be set before using the instance.
105         * This constructor can be used to prepare a JmsTemplate via a BeanFactory,
106         * typically setting the ConnectionFactory via setConnectionFactory.
107         * <p>Turns off transaction synchronization by default, as this manager might
108         * be used alongside a datastore-based Spring transaction manager like
109         * DataSourceTransactionManager, which has stronger needs for synchronization.
110         * Only one manager is allowed to drive synchronization at any point of time.
111         * @see #setConnectionFactory
112         * @see #setTransactionSynchronization
113         */
114        public JmsTransactionManager() {
115                setTransactionSynchronization(SYNCHRONIZATION_NEVER);
116        }
117
118        /**
119         * Create a new JmsTransactionManager, given a ConnectionFactory.
120         * @param connectionFactory the ConnectionFactory to obtain connections from
121         */
122        public JmsTransactionManager(ConnectionFactory connectionFactory) {
123                this();
124                setConnectionFactory(connectionFactory);
125                afterPropertiesSet();
126        }
127
128
129        /**
130         * Set the JMS ConnectionFactory that this instance should manage transactions for.
131         */
132        public void setConnectionFactory(@Nullable ConnectionFactory cf) {
133                if (cf instanceof TransactionAwareConnectionFactoryProxy) {
134                        // If we got a TransactionAwareConnectionFactoryProxy, we need to perform transactions
135                        // for its underlying target ConnectionFactory, else JMS access code won't see
136                        // properly exposed transactions (i.e. transactions for the target ConnectionFactory).
137                        this.connectionFactory = ((TransactionAwareConnectionFactoryProxy) cf).getTargetConnectionFactory();
138                }
139                else {
140                        this.connectionFactory = cf;
141                }
142        }
143
144        /**
145         * Return the JMS ConnectionFactory that this instance should manage transactions for.
146         */
147        @Nullable
148        public ConnectionFactory getConnectionFactory() {
149                return this.connectionFactory;
150        }
151
152        /**
153         * Obtain the ConnectionFactory for actual use.
154         * @return the ConnectionFactory (never {@code null})
155         * @throws IllegalStateException in case of no ConnectionFactory set
156         * @since 5.0
157         */
158        protected final ConnectionFactory obtainConnectionFactory() {
159                ConnectionFactory connectionFactory = getConnectionFactory();
160                Assert.state(connectionFactory != null, "No ConnectionFactory set");
161                return connectionFactory;
162        }
163
164        /**
165         * Specify whether this transaction manager should lazily retrieve a JMS
166         * Connection and Session on access within a transaction ({@code true}).
167         * By default, it will eagerly create a JMS Connection and Session at
168         * transaction begin ({@code false}).
169         * @since 5.1.6
170         * @see JmsResourceHolder#getConnection()
171         * @see JmsResourceHolder#getSession()
172         */
173        public void setLazyResourceRetrieval(boolean lazyResourceRetrieval) {
174                this.lazyResourceRetrieval = lazyResourceRetrieval;
175        }
176
177        /**
178         * Make sure the ConnectionFactory has been set.
179         */
180        @Override
181        public void afterPropertiesSet() {
182                if (getConnectionFactory() == null) {
183                        throw new IllegalArgumentException("Property 'connectionFactory' is required");
184                }
185        }
186
187
188        @Override
189        public Object getResourceFactory() {
190                return obtainConnectionFactory();
191        }
192
193        @Override
194        protected Object doGetTransaction() {
195                JmsTransactionObject txObject = new JmsTransactionObject();
196                txObject.setResourceHolder(
197                                (JmsResourceHolder) TransactionSynchronizationManager.getResource(obtainConnectionFactory()));
198                return txObject;
199        }
200
201        @Override
202        protected boolean isExistingTransaction(Object transaction) {
203                JmsTransactionObject txObject = (JmsTransactionObject) transaction;
204                return txObject.hasResourceHolder();
205        }
206
207        @Override
208        protected void doBegin(Object transaction, TransactionDefinition definition) {
209                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
210                        throw new InvalidIsolationLevelException("JMS does not support an isolation level concept");
211                }
212
213                ConnectionFactory connectionFactory = obtainConnectionFactory();
214                JmsTransactionObject txObject = (JmsTransactionObject) transaction;
215                Connection con = null;
216                Session session = null;
217                try {
218                        JmsResourceHolder resourceHolder;
219                        if (this.lazyResourceRetrieval) {
220                                resourceHolder = new LazyJmsResourceHolder(connectionFactory);
221                        }
222                        else {
223                                con = createConnection();
224                                session = createSession(con);
225                                if (logger.isDebugEnabled()) {
226                                        logger.debug("Created JMS transaction on Session [" + session + "] from Connection [" + con + "]");
227                                }
228                                resourceHolder = new JmsResourceHolder(connectionFactory, con, session);
229                        }
230                        resourceHolder.setSynchronizedWithTransaction(true);
231                        int timeout = determineTimeout(definition);
232                        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
233                                resourceHolder.setTimeoutInSeconds(timeout);
234                        }
235                        txObject.setResourceHolder(resourceHolder);
236                        TransactionSynchronizationManager.bindResource(connectionFactory, resourceHolder);
237                }
238                catch (Throwable ex) {
239                        if (session != null) {
240                                try {
241                                        session.close();
242                                }
243                                catch (Throwable ex2) {
244                                        // ignore
245                                }
246                        }
247                        if (con != null) {
248                                try {
249                                        con.close();
250                                }
251                                catch (Throwable ex2) {
252                                        // ignore
253                                }
254                        }
255                        throw new CannotCreateTransactionException("Could not create JMS transaction", ex);
256                }
257        }
258
259        @Override
260        protected Object doSuspend(Object transaction) {
261                JmsTransactionObject txObject = (JmsTransactionObject) transaction;
262                txObject.setResourceHolder(null);
263                return TransactionSynchronizationManager.unbindResource(obtainConnectionFactory());
264        }
265
266        @Override
267        protected void doResume(@Nullable Object transaction, Object suspendedResources) {
268                TransactionSynchronizationManager.bindResource(obtainConnectionFactory(), suspendedResources);
269        }
270
271        @Override
272        protected void doCommit(DefaultTransactionStatus status) {
273                JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
274                Session session = txObject.getResourceHolder().getOriginalSession();
275                if (session != null) {
276                        try {
277                                if (status.isDebug()) {
278                                        logger.debug("Committing JMS transaction on Session [" + session + "]");
279                                }
280                                session.commit();
281                        }
282                        catch (TransactionRolledBackException ex) {
283                                throw new UnexpectedRollbackException("JMS transaction rolled back", ex);
284                        }
285                        catch (JMSException ex) {
286                                throw new TransactionSystemException("Could not commit JMS transaction", ex);
287                        }
288                }
289        }
290
291        @Override
292        protected void doRollback(DefaultTransactionStatus status) {
293                JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
294                Session session = txObject.getResourceHolder().getOriginalSession();
295                if (session != null) {
296                        try {
297                                if (status.isDebug()) {
298                                        logger.debug("Rolling back JMS transaction on Session [" + session + "]");
299                                }
300                                session.rollback();
301                        }
302                        catch (JMSException ex) {
303                                throw new TransactionSystemException("Could not roll back JMS transaction", ex);
304                        }
305                }
306        }
307
308        @Override
309        protected void doSetRollbackOnly(DefaultTransactionStatus status) {
310                JmsTransactionObject txObject = (JmsTransactionObject) status.getTransaction();
311                txObject.getResourceHolder().setRollbackOnly();
312        }
313
314        @Override
315        protected void doCleanupAfterCompletion(Object transaction) {
316                JmsTransactionObject txObject = (JmsTransactionObject) transaction;
317                TransactionSynchronizationManager.unbindResource(obtainConnectionFactory());
318                txObject.getResourceHolder().closeAll();
319                txObject.getResourceHolder().clear();
320        }
321
322
323        /**
324         * Create a JMS Connection via this template's ConnectionFactory.
325         * <p>This implementation uses JMS 1.1 API.
326         * @return the new JMS Connection
327         * @throws javax.jms.JMSException if thrown by JMS API methods
328         */
329        protected Connection createConnection() throws JMSException {
330                return obtainConnectionFactory().createConnection();
331        }
332
333        /**
334         * Create a JMS Session for the given Connection.
335         * <p>This implementation uses JMS 1.1 API.
336         * @param con the JMS Connection to create a Session for
337         * @return the new JMS Session
338         * @throws javax.jms.JMSException if thrown by JMS API methods
339         */
340        protected Session createSession(Connection con) throws JMSException {
341                return con.createSession(true, Session.AUTO_ACKNOWLEDGE);
342        }
343
344
345        /**
346         * Lazily initializing variant of {@link JmsResourceHolder},
347         * initializing a JMS Connection and Session on user access.
348         */
349        private class LazyJmsResourceHolder extends JmsResourceHolder {
350
351                private boolean connectionInitialized = false;
352
353                private boolean sessionInitialized = false;
354
355                public LazyJmsResourceHolder(@Nullable ConnectionFactory connectionFactory) {
356                        super(connectionFactory);
357                }
358
359                @Override
360                @Nullable
361                public Connection getConnection() {
362                        initializeConnection();
363                        return super.getConnection();
364                }
365
366                @Override
367                @Nullable
368                public <C extends Connection> C getConnection(Class<C> connectionType) {
369                        initializeConnection();
370                        return super.getConnection(connectionType);
371                }
372
373                @Override
374                @Nullable
375                public Session getSession() {
376                        initializeSession();
377                        return super.getSession();
378                }
379
380                @Override
381                @Nullable
382                public <S extends Session> S getSession(Class<S> sessionType) {
383                        initializeSession();
384                        return super.getSession(sessionType);
385                }
386
387                @Override
388                @Nullable
389                public <S extends Session> S getSession(Class<S> sessionType, @Nullable Connection connection) {
390                        initializeSession();
391                        return super.getSession(sessionType, connection);
392                }
393
394                private void initializeConnection() {
395                        if (!this.connectionInitialized) {
396                                try {
397                                        addConnection(createConnection());
398                                }
399                                catch (JMSException ex) {
400                                        throw new CannotCreateTransactionException(
401                                                        "Failed to lazily initialize JMS Connection for transaction", ex);
402                                }
403                                this.connectionInitialized = true;
404                        }
405                }
406
407                private void initializeSession() {
408                        if (!this.sessionInitialized) {
409                                Connection con = getConnection();
410                                Assert.state(con != null, "No transactional JMS Connection");
411                                try {
412                                        addSession(createSession(con), con);
413                                }
414                                catch (JMSException ex) {
415                                        throw new CannotCreateTransactionException(
416                                                        "Failed to lazily initialize JMS Session for transaction", ex);
417                                }
418                                this.sessionInitialized = true;
419                        }
420                }
421        }
422
423
424        /**
425         * JMS transaction object, representing a JmsResourceHolder.
426         * Used as transaction object by JmsTransactionManager.
427         * @see JmsResourceHolder
428         */
429        private static class JmsTransactionObject implements SmartTransactionObject {
430
431                @Nullable
432                private JmsResourceHolder resourceHolder;
433
434                public void setResourceHolder(@Nullable JmsResourceHolder resourceHolder) {
435                        this.resourceHolder = resourceHolder;
436                }
437
438                public JmsResourceHolder getResourceHolder() {
439                        Assert.state(this.resourceHolder != null, "No JmsResourceHolder available");
440                        return this.resourceHolder;
441                }
442
443                public boolean hasResourceHolder() {
444                        return (this.resourceHolder != null);
445                }
446
447                @Override
448                public boolean isRollbackOnly() {
449                        return (this.resourceHolder != null && this.resourceHolder.isRollbackOnly());
450                }
451
452                @Override
453                public void flush() {
454                        // no-op
455                }
456        }
457
458}