001/*
002 * Copyright 2002-2017 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import javax.jms.Connection;
020import javax.jms.Destination;
021import javax.jms.JMSException;
022import javax.jms.Message;
023import javax.jms.MessageConsumer;
024import javax.jms.Session;
025
026import org.springframework.jms.connection.ConnectionFactoryUtils;
027import org.springframework.jms.connection.JmsResourceHolder;
028import org.springframework.jms.connection.SingleConnectionFactory;
029import org.springframework.jms.support.JmsUtils;
030import org.springframework.lang.Nullable;
031import org.springframework.transaction.PlatformTransactionManager;
032import org.springframework.transaction.TransactionStatus;
033import org.springframework.transaction.support.DefaultTransactionDefinition;
034import org.springframework.transaction.support.ResourceTransactionManager;
035import org.springframework.transaction.support.TransactionSynchronizationManager;
036import org.springframework.transaction.support.TransactionSynchronizationUtils;
037import org.springframework.util.Assert;
038
039/**
040 * Base class for listener container implementations which are based on polling.
041 * Provides support for listener handling based on {@link javax.jms.MessageConsumer},
042 * optionally participating in externally managed transactions.
043 *
044 * <p>This listener container variant is built for repeated polling attempts,
045 * each invoking the {@link #receiveAndExecute} method. The MessageConsumer used
046 * may be reobtained fo reach attempt or cached in between attempts; this is up
047 * to the concrete implementation. The receive timeout for each attempt can be
048 * configured through the {@link #setReceiveTimeout "receiveTimeout"} property.
049 *
050 * <p>The underlying mechanism is based on standard JMS MessageConsumer handling,
051 * which is perfectly compatible with both native JMS and JMS in a Java EE environment.
052 * Neither the JMS {@code MessageConsumer.setMessageListener} facility  nor the JMS
053 * ServerSessionPool facility is required. A further advantage of this approach is
054 * full control over the listening process, allowing for custom scaling and throttling
055 * and of concurrent message processing (which is up to concrete subclasses).
056 *
057 * <p>Message reception and listener execution can automatically be wrapped
058 * in transactions through passing a Spring
059 * {@link org.springframework.transaction.PlatformTransactionManager} into the
060 * {@link #setTransactionManager "transactionManager"} property. This will usually
061 * be a {@link org.springframework.transaction.jta.JtaTransactionManager} in a
062 * Java EE environment, in combination with a JTA-aware JMS ConnectionFactory
063 * obtained from JNDI (check your application server's documentation).
064 *
065 * <p>This base class does not assume any specific mechanism for asynchronous
066 * execution of polling invokers. Check out {@link DefaultMessageListenerContainer}
067 * for a concrete implementation which is based on Spring's
068 * {@link org.springframework.core.task.TaskExecutor} abstraction,
069 * including dynamic scaling of concurrent consumers and automatic self recovery.
070 *
071 * @author Juergen Hoeller
072 * @since 2.0.3
073 * @see #createListenerConsumer
074 * @see #receiveAndExecute
075 * @see #setTransactionManager
076 */
077public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer {
078
079        /**
080         * The default receive timeout: 1000 ms = 1 second.
081         */
082        public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
083
084
085        private final MessageListenerContainerResourceFactory transactionalResourceFactory =
086                        new MessageListenerContainerResourceFactory();
087
088        private boolean sessionTransactedCalled = false;
089
090        @Nullable
091        private PlatformTransactionManager transactionManager;
092
093        private DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
094
095        private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
096
097
098        @Override
099        public void setSessionTransacted(boolean sessionTransacted) {
100                super.setSessionTransacted(sessionTransacted);
101                this.sessionTransactedCalled = true;
102        }
103
104        /**
105         * Specify the Spring {@link org.springframework.transaction.PlatformTransactionManager}
106         * to use for transactional wrapping of message reception plus listener execution.
107         * <p>Default is none, not performing any transactional wrapping.
108         * If specified, this will usually be a Spring
109         * {@link org.springframework.transaction.jta.JtaTransactionManager} or one
110         * of its subclasses, in combination with a JTA-aware ConnectionFactory that
111         * this message listener container obtains its Connections from.
112         * <p><b>Note: Consider the use of local JMS transactions instead.</b>
113         * Simply switch the {@link #setSessionTransacted "sessionTransacted"} flag
114         * to "true" in order to use a locally transacted JMS Session for the entire
115         * receive processing, including any Session operations performed by a
116         * {@link SessionAwareMessageListener} (e.g. sending a response message). This
117         * allows for fully synchronized Spring transactions based on local JMS
118         * transactions, similar to what
119         * {@link org.springframework.jms.connection.JmsTransactionManager} provides. Check
120         * {@link AbstractMessageListenerContainer}'s javadoc for
121         * a discussion of transaction choices and message redelivery scenarios.
122         * @see #setSessionTransacted(boolean)
123         * @see org.springframework.transaction.jta.JtaTransactionManager
124         * @see org.springframework.jms.connection.JmsTransactionManager
125         */
126        public void setTransactionManager(@Nullable PlatformTransactionManager transactionManager) {
127                this.transactionManager = transactionManager;
128        }
129
130        /**
131         * Return the Spring PlatformTransactionManager to use for transactional
132         * wrapping of message reception plus listener execution.
133         */
134        @Nullable
135        protected final PlatformTransactionManager getTransactionManager() {
136                return this.transactionManager;
137        }
138
139        /**
140         * Specify the transaction name to use for transactional wrapping.
141         * Default is the bean name of this listener container, if any.
142         * @see org.springframework.transaction.TransactionDefinition#getName()
143         */
144        public void setTransactionName(String transactionName) {
145                this.transactionDefinition.setName(transactionName);
146        }
147
148        /**
149         * Specify the transaction timeout to use for transactional wrapping, in <b>seconds</b>.
150         * Default is none, using the transaction manager's default timeout.
151         * @see org.springframework.transaction.TransactionDefinition#getTimeout()
152         * @see #setReceiveTimeout
153         */
154        public void setTransactionTimeout(int transactionTimeout) {
155                this.transactionDefinition.setTimeout(transactionTimeout);
156        }
157
158        /**
159         * Set the timeout to use for receive calls, in <b>milliseconds</b>.
160         * The default is 1000 ms, that is, 1 second.
161         * <p><b>NOTE:</b> This value needs to be smaller than the transaction
162         * timeout used by the transaction manager (in the appropriate unit,
163         * of course). 0 indicates no timeout at all; however, this is only
164         * feasible if not running within a transaction manager and generally
165         * discouraged since such a listener container cannot cleanly shut down.
166         * A negative value such as -1 indicates a no-wait receive operation.
167         * @see #receiveFromConsumer(MessageConsumer, long)
168         * @see javax.jms.MessageConsumer#receive(long)
169         * @see javax.jms.MessageConsumer#receiveNoWait()
170         * @see javax.jms.MessageConsumer#receive()
171         * @see #setTransactionTimeout
172         */
173        public void setReceiveTimeout(long receiveTimeout) {
174                this.receiveTimeout = receiveTimeout;
175        }
176
177        /**
178         * Return the receive timeout (ms) configured for this listener container.
179         * @since 4.2
180         */
181        protected long getReceiveTimeout() {
182                return this.receiveTimeout;
183        }
184
185
186        @Override
187        public void initialize() {
188                // Set sessionTransacted=true in case of a non-JTA transaction manager.
189                if (!this.sessionTransactedCalled &&
190                                this.transactionManager instanceof ResourceTransactionManager &&
191                                !TransactionSynchronizationUtils.sameResourceFactory(
192                                                (ResourceTransactionManager) this.transactionManager, obtainConnectionFactory())) {
193                        super.setSessionTransacted(true);
194                }
195
196                // Use bean name as default transaction name.
197                if (this.transactionDefinition.getName() == null) {
198                        String beanName = getBeanName();
199                        if (beanName != null) {
200                                this.transactionDefinition.setName(beanName);
201                        }
202                }
203
204                // Proceed with superclass initialization.
205                super.initialize();
206        }
207
208
209        /**
210         * Create a MessageConsumer for the given JMS Session,
211         * registering a MessageListener for the specified listener.
212         * @param session the JMS Session to work on
213         * @return the MessageConsumer
214         * @throws javax.jms.JMSException if thrown by JMS methods
215         * @see #receiveAndExecute
216         */
217        protected MessageConsumer createListenerConsumer(Session session) throws JMSException {
218                Destination destination = getDestination();
219                if (destination == null) {
220                        String destinationName = getDestinationName();
221                        Assert.state(destinationName != null, "No destination set");
222                        destination = resolveDestinationName(session, destinationName);
223                }
224                return createConsumer(session, destination);
225        }
226
227        /**
228         * Execute the listener for a message received from the given consumer,
229         * wrapping the entire operation in an external transaction if demanded.
230         * @param session the JMS Session to work on
231         * @param consumer the MessageConsumer to work on
232         * @return whether a message has been received
233         * @throws JMSException if thrown by JMS methods
234         * @see #doReceiveAndExecute
235         */
236        protected boolean receiveAndExecute(
237                        Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer)
238                        throws JMSException {
239
240                if (this.transactionManager != null) {
241                        // Execute receive within transaction.
242                        TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
243                        boolean messageReceived;
244                        try {
245                                messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
246                        }
247                        catch (JMSException | RuntimeException | Error ex) {
248                                rollbackOnException(this.transactionManager, status, ex);
249                                throw ex;
250                        }
251                        this.transactionManager.commit(status);
252                        return messageReceived;
253                }
254
255                else {
256                        // Execute receive outside of transaction.
257                        return doReceiveAndExecute(invoker, session, consumer, null);
258                }
259        }
260
261        /**
262         * Actually execute the listener for a message received from the given consumer,
263         * fetching all requires resources and invoking the listener.
264         * @param session the JMS Session to work on
265         * @param consumer the MessageConsumer to work on
266         * @param status the TransactionStatus (may be {@code null})
267         * @return whether a message has been received
268         * @throws JMSException if thrown by JMS methods
269         * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
270         */
271        protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session,
272                        @Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException {
273
274                Connection conToClose = null;
275                Session sessionToClose = null;
276                MessageConsumer consumerToClose = null;
277                try {
278                        Session sessionToUse = session;
279                        boolean transactional = false;
280                        if (sessionToUse == null) {
281                                sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
282                                                obtainConnectionFactory(), this.transactionalResourceFactory, true);
283                                transactional = (sessionToUse != null);
284                        }
285                        if (sessionToUse == null) {
286                                Connection conToUse;
287                                if (sharedConnectionEnabled()) {
288                                        conToUse = getSharedConnection();
289                                }
290                                else {
291                                        conToUse = createConnection();
292                                        conToClose = conToUse;
293                                        conToUse.start();
294                                }
295                                sessionToUse = createSession(conToUse);
296                                sessionToClose = sessionToUse;
297                        }
298                        MessageConsumer consumerToUse = consumer;
299                        if (consumerToUse == null) {
300                                consumerToUse = createListenerConsumer(sessionToUse);
301                                consumerToClose = consumerToUse;
302                        }
303                        Message message = receiveMessage(consumerToUse);
304                        if (message != null) {
305                                if (logger.isDebugEnabled()) {
306                                        logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
307                                                        consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
308                                                        sessionToUse + "]");
309                                }
310                                messageReceived(invoker, sessionToUse);
311                                boolean exposeResource = (!transactional && isExposeListenerSession() &&
312                                                !TransactionSynchronizationManager.hasResource(obtainConnectionFactory()));
313                                if (exposeResource) {
314                                        TransactionSynchronizationManager.bindResource(
315                                                        obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
316                                }
317                                try {
318                                        doExecuteListener(sessionToUse, message);
319                                }
320                                catch (Throwable ex) {
321                                        if (status != null) {
322                                                if (logger.isDebugEnabled()) {
323                                                        logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
324                                                }
325                                                status.setRollbackOnly();
326                                        }
327                                        handleListenerException(ex);
328                                        // Rethrow JMSException to indicate an infrastructure problem
329                                        // that may have to trigger recovery...
330                                        if (ex instanceof JMSException) {
331                                                throw (JMSException) ex;
332                                        }
333                                }
334                                finally {
335                                        if (exposeResource) {
336                                                TransactionSynchronizationManager.unbindResource(obtainConnectionFactory());
337                                        }
338                                }
339                                // Indicate that a message has been received.
340                                return true;
341                        }
342                        else {
343                                if (logger.isTraceEnabled()) {
344                                        logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
345                                                        "session [" + sessionToUse + "] did not receive a message");
346                                }
347                                noMessageReceived(invoker, sessionToUse);
348                                // Nevertheless call commit, in order to reset the transaction timeout (if any).
349                                if (shouldCommitAfterNoMessageReceived(sessionToUse)) {
350                                        commitIfNecessary(sessionToUse, null);
351                                }
352                                // Indicate that no message has been received.
353                                return false;
354                        }
355                }
356                finally {
357                        JmsUtils.closeMessageConsumer(consumerToClose);
358                        JmsUtils.closeSession(sessionToClose);
359                        ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
360                }
361        }
362
363        /**
364         * This implementation checks whether the Session is externally synchronized.
365         * In this case, the Session is not locally transacted, despite the listener
366         * container's "sessionTransacted" flag being set to "true".
367         * @see org.springframework.jms.connection.JmsResourceHolder
368         */
369        @Override
370        protected boolean isSessionLocallyTransacted(Session session) {
371                if (!super.isSessionLocallyTransacted(session)) {
372                        return false;
373                }
374                JmsResourceHolder resourceHolder =
375                                (JmsResourceHolder) TransactionSynchronizationManager.getResource(obtainConnectionFactory());
376                return (resourceHolder == null || resourceHolder instanceof LocallyExposedJmsResourceHolder ||
377                                !resourceHolder.containsSession(session));
378        }
379
380        /**
381         * Determine whether to trigger a commit after no message has been received.
382         * This is a good idea on any modern-day JMS provider.
383         * @param session the current JMS Session which received no message
384         * @return whether to call {@link #commitIfNecessary} on the given Session
385         */
386        protected boolean shouldCommitAfterNoMessageReceived(Session session) {
387                return true;
388        }
389
390        /**
391         * Perform a rollback, handling rollback exceptions properly.
392         * @param status object representing the transaction
393         * @param ex the thrown listener exception or error
394         */
395        private void rollbackOnException(PlatformTransactionManager manager, TransactionStatus status, Throwable ex) {
396                logger.debug("Initiating transaction rollback on listener exception", ex);
397                try {
398                        manager.rollback(status);
399                }
400                catch (RuntimeException ex2) {
401                        logger.error("Listener exception overridden by rollback exception", ex);
402                        throw ex2;
403                }
404                catch (Error err) {
405                        logger.error("Listener exception overridden by rollback error", ex);
406                        throw err;
407                }
408        }
409
410        /**
411         * Receive a message from the given consumer.
412         * @param consumer the MessageConsumer to use
413         * @return the Message, or {@code null} if none
414         * @throws JMSException if thrown by JMS methods
415         */
416        @Nullable
417        protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
418                return receiveFromConsumer(consumer, getReceiveTimeout());
419        }
420
421        /**
422         * Template method that gets called right when a new message has been received,
423         * before attempting to process it. Allows subclasses to react to the event
424         * of an actual incoming message, for example adapting their consumer count.
425         * @param invoker the invoker object (passed through)
426         * @param session the receiving JMS Session
427         */
428        protected void messageReceived(Object invoker, Session session) {
429        }
430
431        /**
432         * Template method that gets called when <i>no</i> message has been received,
433         * before returning to the receive loop again. Allows subclasses to react to
434         * the event of no incoming message, for example marking the invoker as idle.
435         * @param invoker the invoker object (passed through)
436         * @param session the receiving JMS Session
437         */
438        protected void noMessageReceived(Object invoker, Session session) {
439        }
440
441        /**
442         * Fetch an appropriate Connection from the given JmsResourceHolder.
443         * <p>This implementation accepts any JMS 1.1 Connection.
444         * @param holder the JmsResourceHolder
445         * @return an appropriate Connection fetched from the holder,
446         * or {@code null} if none found
447         */
448        @Nullable
449        protected Connection getConnection(JmsResourceHolder holder) {
450                return holder.getConnection();
451        }
452
453        /**
454         * Fetch an appropriate Session from the given JmsResourceHolder.
455         * <p>This implementation accepts any JMS 1.1 Session.
456         * @param holder the JmsResourceHolder
457         * @return an appropriate Session fetched from the holder,
458         * or {@code null} if none found
459         */
460        @Nullable
461        protected Session getSession(JmsResourceHolder holder) {
462                return holder.getSession();
463        }
464
465
466        /**
467         * ResourceFactory implementation that delegates to this listener container's protected callback methods.
468         */
469        private class MessageListenerContainerResourceFactory implements ConnectionFactoryUtils.ResourceFactory {
470
471                @Override
472                @Nullable
473                public Connection getConnection(JmsResourceHolder holder) {
474                        return AbstractPollingMessageListenerContainer.this.getConnection(holder);
475                }
476
477                @Override
478                @Nullable
479                public Session getSession(JmsResourceHolder holder) {
480                        return AbstractPollingMessageListenerContainer.this.getSession(holder);
481                }
482
483                @Override
484                public Connection createConnection() throws JMSException {
485                        if (AbstractPollingMessageListenerContainer.this.sharedConnectionEnabled()) {
486                                Connection sharedCon = AbstractPollingMessageListenerContainer.this.getSharedConnection();
487                                return new SingleConnectionFactory(sharedCon).createConnection();
488                        }
489                        else {
490                                return AbstractPollingMessageListenerContainer.this.createConnection();
491                        }
492                }
493
494                @Override
495                public Session createSession(Connection con) throws JMSException {
496                        return AbstractPollingMessageListenerContainer.this.createSession(con);
497                }
498
499                @Override
500                public boolean isSynchedLocalTransactionAllowed() {
501                        return AbstractPollingMessageListenerContainer.this.isSessionTransacted();
502                }
503        }
504
505}