001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.JMSException;
022import javax.jms.Message;
023import javax.jms.MessageConsumer;
024import javax.jms.Session;
025
026import org.springframework.jms.connection.ConnectionFactoryUtils;
027import org.springframework.jms.connection.JmsResourceHolder;
028import org.springframework.jms.connection.SingleConnectionFactory;
029import org.springframework.jms.support.JmsUtils;
030import org.springframework.transaction.PlatformTransactionManager;
031import org.springframework.transaction.TransactionStatus;
032import org.springframework.transaction.support.DefaultTransactionDefinition;
033import org.springframework.transaction.support.ResourceTransactionManager;
034import org.springframework.transaction.support.TransactionSynchronizationManager;
035import org.springframework.transaction.support.TransactionSynchronizationUtils;
036
037/**
038 * Base class for listener container implementations which are based on polling.
039 * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
040 * optionally participating in externally managed transactions.
041 *
042 * <p>This listener container variant is built for repeated polling attempts,
043 * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
044 * may be reobtained fo reach attempt or cached in between attempts; this is up
045 * to the concrete implementation. The receive timeout for each attempt can be
046 * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
047 *
048 * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
049 * which is perfectly compatible with both native JMS and JMS in a Java EE environment.
050 * Neither the JMS {@code MessageConsumer.setMessageListener} facility  nor the JMS
051 * ServerSessionPool facility is required. A further advantage of this approach is
052 * full control over the listening process, allowing for custom scaling and throttling
053 * and of concurrent message processing (which is up to concrete subclasses).
054 *
055 * <p>Message reception and listener execution can automatically be wrapped
056 * in transactions through passing a Spring
057 * {@link org.springframework.transaction.PlatformTransactionManager} into the
058 * {@link #setTransactionManager "transactionManager"} property. This will usually
059 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
060 * Java EE environment, in combination with a JTA-aware JMS ConnectionFactory
061 * obtained from JNDI (check your application server's documentation).
062 *
063 * <p>This base class does not assume any specific mechanism for asynchronous
064 * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
065 * for a concrete implementation which is based on Spring's
066 * {@link org.springframework.core.task.TaskExecutor} abstraction,
067 * including dynamic scaling of concurrent consumers and automatic self recovery.
068 *
069 * @author Juergen Hoeller
070 * @since 2.0.3
071 * @see #createListenerConsumer
072 * @see #receiveAndExecute
073 * @see #setTransactionManager
074 */
075public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer {
076
077        /**
078         * The default receive timeout: 1000 ms = 1 second.
079         */
080        public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
081
082
083        private final MessageListenerContainerResourceFactory transactionalResourceFactory =
084                        new MessageListenerContainerResourceFactory();
085
086        private boolean sessionTransactedCalled = false;
087
088        private PlatformTransactionManager transactionManager;
089
090        private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
091
092        private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
093
094
095        @Override
096        public void setSessionTransacted(boolean sessionTransacted) {
097                super.setSessionTransacted(sessionTransacted);
098                this.sessionTransactedCalled = true;
099        }
100
101        /**
102         * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
103         * to use for transactional wrapping of message reception plus listener execution.
104         * <p>Default is none, not performing any transactional wrapping.
105         * If specified, this will usually be a Spring
106         * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
107         * of its subclasses, in combination with a JTA-aware ConnectionFactory that
108         * this message listener container obtains its Connections from.
109         * <p><b>Note: Consider the use of local JMS transactions instead.</b>
110         * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
111         * to "true" in order to use a locally transacted JMS Session for the entire
112         * receive processing, including any Session operations performed by a
113         * {@link SessionAwareMessageListener} (e.g. sending a response message). This
114         * allows for fully synchronized Spring transactions based on local JMS
115         * transactions, similar to what
116         * {@link org.springframework.jms.connection.JmsTransactionManager} provides. Check
117         * {@link AbstractMessageListenerContainer}'s javadoc for
118         * a discussion of transaction choices and message redelivery scenarios.
119         * @see #setSessionTransacted(boolean)
120         * @see org.springframework.transaction.jta.JtaTransactionManager
121         * @see org.springframework.jms.connection.JmsTransactionManager
122         */
123        public void setTransactionManager(PlatformTransactionManager transactionManager) {
124                this.transactionManager = transactionManager;
125        }
126
127        /**
128         * Return the Spring PlatformTransactionManager to use for transactional
129         * wrapping of message reception plus listener execution.
130         */
131        protected final PlatformTransactionManager getTransactionManager() {
132                return this.transactionManager;
133        }
134
135        /**
136         * Specify the transaction name to use for transactional wrapping.
137         * Default is the bean name of this listener container, if any.
138         * @see org.springframework.transaction.TransactionDefinition#getName()
139         */
140        public void setTransactionName(String transactionName) {
141                this.transactionDefinition.setName(transactionName);
142        }
143
144        /**
145         * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
146         * Default is none, using the transaction manager's default timeout.
147         * @see org.springframework.transaction.TransactionDefinition#getTimeout()
148         * @see #setReceiveTimeout
149         */
150        public void setTransactionTimeout(int transactionTimeout) {
151                this.transactionDefinition.setTimeout(transactionTimeout);
152        }
153
154        /**
155         * Set the timeout to use for receive calls, in <b>milliseconds</b>.
156         * The default is 1000 ms, that is, 1 second.
157         * <p><b>NOTE:</b> This value needs to be smaller than the transaction
158         * timeout used by the transaction manager (in the appropriate unit,
159         * of course). 0 indicates no timeout at all; however, this is only
160         * feasible if not running within a transaction manager and generally
161         * discouraged since such a listener container cannot cleanly shut down.
162         * A negative value such as -1 indicates a no-wait receive operation.
163         * @see #receiveFromConsumer(MessageConsumer, long)
164         * @see javax.jms.MessageConsumer#receive(long)
165         * @see javax.jms.MessageConsumer#receiveNoWait()
166         * @see javax.jms.MessageConsumer#receive()
167         * @see #setTransactionTimeout
168         */
169        public void setReceiveTimeout(long receiveTimeout) {
170                this.receiveTimeout = receiveTimeout;
171        }
172
173        /**
174         * Return the receive timeout (ms) configured for this listener container.
175         * @since 4.2
176         */
177        protected long getReceiveTimeout() {
178                return this.receiveTimeout;
179        }
180
181
182        @Override
183        public void initialize() {
184                // Set sessionTransacted=true in case of a non-JTA transaction manager.
185                if (!this.sessionTransactedCalled &&
186                                this.transactionManager instanceof ResourceTransactionManager &&
187                                !TransactionSynchronizationUtils.sameResourceFactory(
188                                                (ResourceTransactionManager) this.transactionManager, getConnectionFactory())) {
189                        super.setSessionTransacted(true);
190                }
191
192                // Use bean name as default transaction name.
193                if (this.transactionDefinition.getName() == null) {
194                        this.transactionDefinition.setName(getBeanName());
195                }
196
197                // Proceed with superclass initialization.
198                super.initialize();
199        }
200
201
202        /**
203         * Create a MessageConsumer for the given JMS Session,
204         * registering a MessageListener for the specified listener.
205         * @param session the JMS Session to work on
206         * @return the MessageConsumer
207         * @throws javax.jms.JMSException if thrown by JMS methods
208         * @see #receiveAndExecute
209         */
210        protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
211                Destination destination = getDestination();
212                if (destination == null) {
213                        destination = resolveDestinationName(session, getDestinationName());
214                }
215                return createConsumer(session, destination);
216        }
217
218        /**
219         * Execute the listener for a message received from the given consumer,
220         * wrapping the entire operation in an external transaction if demanded.
221         * @param session the JMS Session to work on
222         * @param consumer the MessageConsumer to work on
223         * @return whether a message has been received
224         * @throws JMSException if thrown by JMS methods
225         * @see #doReceiveAndExecute
226         */
227        protected boolean receiveAndExecute(Object invoker, Session session, MessageConsumer consumer)
228                        throws JMSException {
229
230                if (this.transactionManager != null) {
231                        // Execute receive within transaction.
232                        TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
233                        boolean messageReceived;
234                        try {
235                                messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
236                        }
237                        catch (JMSException ex) {
238                                rollbackOnException(status, ex);
239                                throw ex;
240                        }
241                        catch (RuntimeException ex) {
242                                rollbackOnException(status, ex);
243                                throw ex;
244                        }
245                        catch (Error err) {
246                                rollbackOnException(status, err);
247                                throw err;
248                        }
249                        this.transactionManager.commit(status);
250                        return messageReceived;
251                }
252
253                else {
254                        // Execute receive outside of transaction.
255                        return doReceiveAndExecute(invoker, session, consumer, null);
256                }
257        }
258
259        /**
260         * Actually execute the listener for a message received from the given consumer,
261         * fetching all requires resources and invoking the listener.
262         * @param session the JMS Session to work on
263         * @param consumer the MessageConsumer to work on
264         * @param status the TransactionStatus (may be {@code null})
265         * @return whether a message has been received
266         * @throws JMSException if thrown by JMS methods
267         * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
268         */
269        protected boolean doReceiveAndExecute(
270                        Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
271                        throws JMSException {
272
273                Connection conToClose = null;
274                Session sessionToClose = null;
275                MessageConsumer consumerToClose = null;
276                try {
277                        Session sessionToUse = session;
278                        boolean transactional = false;
279                        if (sessionToUse == null) {
280                                sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
281                                                getConnectionFactory(), this.transactionalResourceFactory, true);
282                                transactional = (sessionToUse != null);
283                        }
284                        if (sessionToUse == null) {
285                                Connection conToUse;
286                                if (sharedConnectionEnabled()) {
287                                        conToUse = getSharedConnection();
288                                }
289                                else {
290                                        conToUse = createConnection();
291                                        conToClose = conToUse;
292                                        conToUse.start();
293                                }
294                                sessionToUse = createSession(conToUse);
295                                sessionToClose = sessionToUse;
296                        }
297                        MessageConsumer consumerToUse = consumer;
298                        if (consumerToUse == null) {
299                                consumerToUse = createListenerConsumer(sessionToUse);
300                                consumerToClose = consumerToUse;
301                        }
302                        Message message = receiveMessage(consumerToUse);
303                        if (message != null) {
304                                if (logger.isDebugEnabled()) {
305                                        logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
306                                                        consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
307                                                        sessionToUse + "]");
308                                }
309                                messageReceived(invoker, sessionToUse);
310                                boolean exposeResource = (!transactional && isExposeListenerSession() &&
311                                                !TransactionSynchronizationManager.hasResource(getConnectionFactory()));
312                                if (exposeResource) {
313                                        TransactionSynchronizationManager.bindResource(
314                                                        getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
315                                }
316                                try {
317                                        doExecuteListener(sessionToUse, message);
318                                }
319                                catch (Throwable ex) {
320                                        if (status != null) {
321                                                if (logger.isDebugEnabled()) {
322                                                        logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
323                                                }
324                                                status.setRollbackOnly();
325                                        }
326                                        handleListenerException(ex);
327                                        // Rethrow JMSException to indicate an infrastructure problem
328                                        // that may have to trigger recovery...
329                                        if (ex instanceof JMSException) {
330                                                throw (JMSException) ex;
331                                        }
332                                }
333                                finally {
334                                        if (exposeResource) {
335                                                TransactionSynchronizationManager.unbindResource(getConnectionFactory());
336                                        }
337                                }
338                                // Indicate that a message has been received.
339                                return true;
340                        }
341                        else {
342                                if (logger.isTraceEnabled()) {
343                                        logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
344                                                        "session [" + sessionToUse + "] did not receive a message");
345                                }
346                                noMessageReceived(invoker, sessionToUse);
347                                // Nevertheless call commit, in order to reset the transaction timeout (if any).
348                                if (shouldCommitAfterNoMessageReceived(sessionToUse)) {
349                                        commitIfNecessary(sessionToUse, message);
350                                }
351                                // Indicate that no message has been received.
352                                return false;
353                        }
354                }
355                finally {
356                        JmsUtils.closeMessageConsumer(consumerToClose);
357                        JmsUtils.closeSession(sessionToClose);
358                        ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
359                }
360        }
361
362        /**
363         * This implementation checks whether the Session is externally synchronized.
364         * In this case, the Session is not locally transacted, despite the listener
365         * container's "sessionTransacted" flag being set to "true".
366         * @see org.springframework.jms.connection.JmsResourceHolder
367         */
368        @Override
369        protected boolean isSessionLocallyTransacted(Session session) {
370                if (!super.isSessionLocallyTransacted(session)) {
371                        return false;
372                }
373                JmsResourceHolder resourceHolder =
374                                (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());
375                return (resourceHolder == null || resourceHolder instanceof LocallyExposedJmsResourceHolder ||
376                                !resourceHolder.containsSession(session));
377        }
378
379        /**
380         * Determine whether to trigger a commit after no message has been received.
381         * This is a good idea on any modern-day JMS provider.
382         * @param session the current JMS Session which received no message
383         * @return whether to call {@link #commitIfNecessary} on the given Session
384         */
385        protected boolean shouldCommitAfterNoMessageReceived(Session session) {
386                return true;
387        }
388
389        /**
390         * Perform a rollback, handling rollback exceptions properly.
391         * @param status object representing the transaction
392         * @param ex the thrown listener exception or error
393         */
394        private void rollbackOnException(TransactionStatus status, Throwable ex) {
395                logger.debug("Initiating transaction rollback on listener exception", ex);
396                try {
397                        this.transactionManager.rollback(status);
398                }
399                catch (RuntimeException ex2) {
400                        logger.error("Listener exception overridden by rollback exception", ex);
401                        throw ex2;
402                }
403                catch (Error err) {
404                        logger.error("Listener exception overridden by rollback error", ex);
405                        throw err;
406                }
407        }
408
409        /**
410         * Receive a message from the given consumer.
411         * @param consumer the MessageConsumer to use
412         * @return the Message, or {@code null} if none
413         * @throws JMSException if thrown by JMS methods
414         */
415        protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
416                return receiveFromConsumer(consumer, getReceiveTimeout());
417        }
418
419        /**
420         * Template method that gets called right when a new message has been received,
421         * before attempting to process it. Allows subclasses to react to the event
422         * of an actual incoming message, for example adapting their consumer count.
423         * @param invoker the invoker object (passed through)
424         * @param session the receiving JMS Session
425         */
426        protected void messageReceived(Object invoker, Session session) {
427        }
428
429        /**
430         * Template method that gets called when <i>no</i> message has been received,
431         * before returning to the receive loop again. Allows subclasses to react to
432         * the event of no incoming message, for example marking the invoker as idle.
433         * @param invoker the invoker object (passed through)
434         * @param session the receiving JMS Session
435         */
436        protected void noMessageReceived(Object invoker, Session session) {
437        }
438
439        /**
440         * Fetch an appropriate Connection from the given JmsResourceHolder.
441         * <p>This implementation accepts any JMS 1.1 Connection.
442         * @param holder the JmsResourceHolder
443         * @return an appropriate Connection fetched from the holder,
444         * or {@code null} if none found
445         */
446        protected Connection getConnection(JmsResourceHolder holder) {
447                return holder.getConnection();
448        }
449
450        /**
451         * Fetch an appropriate Session from the given JmsResourceHolder.
452         * <p>This implementation accepts any JMS 1.1 Session.
453         * @param holder the JmsResourceHolder
454         * @return an appropriate Session fetched from the holder,
455         * or {@code null} if none found
456         */
457        protected Session getSession(JmsResourceHolder holder) {
458                return holder.getSession();
459        }
460
461
462        /**
463         * ResourceFactory implementation that delegates to this listener container's protected callback methods.
464         */
465        private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
466
467                @Override
468                public Connection getConnection(JmsResourceHolder holder) {
469                        return AbstractPollingMessageListenerContainer.this.getConnection(holder);
470                }
471
472                @Override
473                public Session getSession(JmsResourceHolder holder) {
474                        return AbstractPollingMessageListenerContainer.this.getSession(holder);
475                }
476
477                @Override
478                public Connection createConnection() throws JMSException {
479                        if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
480                                Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
481                                return new SingleConnectionFactory(sharedCon).createConnection();
482                        }
483                        else {
484                                return AbstractPollingMessageListenerContainer.this.createConnection();
485                        }
486                }
487
488                @Override
489                public Session createSession(Connection con) throws JMSException {
490                        return AbstractPollingMessageListenerContainer.this.createSession(con);
491                }
492
493                @Override
494                public boolean isSynchedLocalTransactionAllowed() {
495                        return AbstractPollingMessageListenerContainer.this.isSessionTransacted();
496                }
497        }
498
499}