001/*
002 * Copyright 2002-2020 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.util.HashSet;
020import java.util.Set;
021import java.util.concurrent.Executor;
022
023import javax.jms.Connection;
024import javax.jms.ConnectionFactory;
025import javax.jms.Destination;
026import javax.jms.ExceptionListener;
027import javax.jms.JMSException;
028import javax.jms.Message;
029import javax.jms.MessageConsumer;
030import javax.jms.Session;
031
032import org.springframework.jms.support.JmsUtils;
033import org.springframework.lang.Nullable;
034import org.springframework.transaction.support.TransactionSynchronizationManager;
035import org.springframework.util.Assert;
036
037/**
038 * Message listener container that uses the plain JMS client API's
039 * {@code MessageConsumer.setMessageListener()} method to
040 * create concurrent MessageConsumers for the specified listeners.
041 *
042 * <p>This is the simplest form of a message listener container.
043 * It creates a fixed number of JMS Sessions to invoke the listener,
044 * not allowing for dynamic adaptation to runtime demands. Its main
045 * advantage is its low level of complexity and the minimum requirements
046 * on the JMS provider: Not even the ServerSessionPool facility is required.
047 *
048 * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
049 * on acknowledge modes and transaction options. Note that this container
050 * exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:
051 * that is, automatic message acknowledgment after listener execution,
052 * with no redelivery in case of a user exception thrown but potential
053 * redelivery in case of the JVM dying during listener execution.
054 *
055 * <p>For a different style of MessageListener handling, through looped
056 * {@code MessageConsumer.receive()} calls that also allow for
057 * transactional reception of messages (registering them with XA transactions),
058 * see {@link DefaultMessageListenerContainer}.
059 *
060 * @author Juergen Hoeller
061 * @since 2.0
062 * @see javax.jms.MessageConsumer#setMessageListener
063 * @see DefaultMessageListenerContainer
064 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
065 */
066public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener {
067
068        private boolean connectLazily = false;
069
070        private boolean recoverOnException = true;
071
072        private int concurrentConsumers = 1;
073
074        @Nullable
075        private Executor taskExecutor;
076
077        @Nullable
078        private Set<Session> sessions;
079
080        @Nullable
081        private Set<MessageConsumer> consumers;
082
083        private final Object consumersMonitor = new Object();
084
085
086        /**
087         * Specify whether to connect lazily, i.e. whether to establish the JMS Connection
088         * and the corresponding Sessions and MessageConsumers as late as possible -
089         * in the start phase of this container.
090         * <p>Default is "false": connecting early, i.e. during the bean initialization phase.
091         * Set this flag to "true" in order to switch to lazy connecting if your target broker
092         * is likely to not have started up yet and you prefer to not even try a connection.
093         * @see #start()
094         * @see #initialize()
095         */
096        public void setConnectLazily(boolean connectLazily) {
097                this.connectLazily = connectLazily;
098        }
099
100        /**
101         * Specify whether to explicitly recover the shared JMS Connection and the
102         * associated Sessions and MessageConsumers whenever a JMSException is reported.
103         * <p>Default is "true": refreshing the shared connection and re-initializing the
104         * consumers whenever the connection propagates an exception to its listener.
105         * Set this flag to "false" in order to rely on automatic recovery within the
106         * provider, holding on to the existing connection and consumer handles.
107         * @since 5.1.8
108         * @see #onException(JMSException)
109         * @see Connection#setExceptionListener
110         */
111        public void setRecoverOnException(boolean recoverOnException) {
112                this.recoverOnException = recoverOnException;
113        }
114
115        /**
116         * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
117         * upper limit String, e.g. "10".
118         * <p>This listener container will always hold on to the maximum number of
119         * consumers {@link #setConcurrentConsumers} since it is unable to scale.
120         * <p>This property is primarily supported for configuration compatibility with
121         * {@link DefaultMessageListenerContainer}. For this local listener container,
122         * generally use {@link #setConcurrentConsumers} instead.
123         */
124        @Override
125        public void setConcurrency(String concurrency) {
126                try {
127                        int separatorIndex = concurrency.indexOf('-');
128                        if (separatorIndex != -1) {
129                                setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
130                        }
131                        else {
132                                setConcurrentConsumers(Integer.parseInt(concurrency));
133                        }
134                }
135                catch (NumberFormatException ex) {
136                        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
137                                        "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
138                                        "Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " +
139                                        "always keep a fixed number of consumers according to the maximum value.");
140                }
141        }
142
143        /**
144         * Specify the number of concurrent consumers to create. Default is 1.
145         * <p>Raising the number of concurrent consumers is recommendable in order
146         * to scale the consumption of messages coming in from a queue. However,
147         * note that any ordering guarantees are lost once multiple consumers are
148         * registered. In general, stick with 1 consumer for low-volume queues.
149         * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
150         * This would lead to concurrent consumption of the same message,
151         * which is hardly ever desirable.
152         */
153        public void setConcurrentConsumers(int concurrentConsumers) {
154                Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
155                this.concurrentConsumers = concurrentConsumers;
156        }
157
158        /**
159         * Set the Spring TaskExecutor to use for executing the listener once
160         * a message has been received by the provider.
161         * <p>Default is none, that is, to run in the JMS provider's own receive thread,
162         * blocking the provider's receive endpoint while executing the listener.
163         * <p>Specify a TaskExecutor for executing the listener in a different thread,
164         * rather than blocking the JMS provider, usually integrating with an existing
165         * thread pool. This allows to keep the number of concurrent consumers low (1)
166         * while still processing messages concurrently (decoupled from receiving!).
167         * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects
168         * acknowledgement semantics.</b> Messages will then always get acknowledged
169         * before listener execution, with the underlying Session immediately reused
170         * for receiving the next message. Using this in combination with a transacted
171         * session or with client acknowledgement will lead to unspecified results!
172         * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead
173         * to concurrent processing of messages that have been received by the same
174         * underlying Session.</b> As a consequence, it is not recommended to use
175         * this setting with a {@link SessionAwareMessageListener}, at least not
176         * if the latter performs actual work on the given Session. A standard
177         * {@link javax.jms.MessageListener} will work fine, in general.
178         * @see #setConcurrentConsumers
179         * @see org.springframework.core.task.SimpleAsyncTaskExecutor
180         * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
181         */
182        public void setTaskExecutor(Executor taskExecutor) {
183                this.taskExecutor = taskExecutor;
184        }
185
186        @Override
187        protected void validateConfiguration() {
188                super.validateConfiguration();
189                if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
190                        throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
191                }
192        }
193
194
195        //-------------------------------------------------------------------------
196        // Implementation of AbstractMessageListenerContainer's template methods
197        //-------------------------------------------------------------------------
198
199        /**
200         * Always use a shared JMS Connection.
201         */
202        @Override
203        protected final boolean sharedConnectionEnabled() {
204                return true;
205        }
206
207        /**
208         * Creates the specified number of concurrent consumers,
209         * in the form of a JMS Session plus associated MessageConsumer.
210         * @see #createListenerConsumer
211         */
212        @Override
213        protected void doInitialize() throws JMSException {
214                if (!this.connectLazily) {
215                        try {
216                                establishSharedConnection();
217                        }
218                        catch (JMSException ex) {
219                                logger.debug("Could not connect on initialization - registering message consumers lazily", ex);
220                                return;
221                        }
222                        initializeConsumers();
223                }
224        }
225
226        /**
227         * Re-initializes this container's JMS message consumers,
228         * if not initialized already.
229         */
230        @Override
231        protected void doStart() throws JMSException {
232                super.doStart();
233                initializeConsumers();
234        }
235
236        /**
237         * Registers this listener container as JMS ExceptionListener on the shared connection.
238         */
239        @Override
240        protected void prepareSharedConnection(Connection connection) throws JMSException {
241                super.prepareSharedConnection(connection);
242                connection.setExceptionListener(this);
243        }
244
245        /**
246         * JMS ExceptionListener implementation, invoked by the JMS provider in
247         * case of connection failures. Re-initializes this listener container's
248         * shared connection and its sessions and consumers, if necessary.
249         * @param ex the reported connection exception
250         * @see #setRecoverOnException
251         * @see #refreshSharedConnection()
252         * @see #initializeConsumers()
253         */
254        @Override
255        public void onException(JMSException ex) {
256                // First invoke the user-specific ExceptionListener, if any.
257                invokeExceptionListener(ex);
258
259                // Now try to recover the shared Connection and all consumers...
260                if (this.recoverOnException) {
261                        if (logger.isDebugEnabled()) {
262                                logger.debug("Trying to recover from JMS Connection exception: " + ex);
263                        }
264                        try {
265                                synchronized (this.consumersMonitor) {
266                                        this.sessions = null;
267                                        this.consumers = null;
268                                }
269                                refreshSharedConnection();
270                                initializeConsumers();
271                                logger.debug("Successfully refreshed JMS Connection");
272                        }
273                        catch (JMSException recoverEx) {
274                                logger.debug("Failed to recover JMS Connection", recoverEx);
275                                logger.error("Encountered non-recoverable JMSException", ex);
276                        }
277                }
278        }
279
280        /**
281         * Initialize the JMS Sessions and MessageConsumers for this container.
282         * @throws JMSException in case of setup failure
283         */
284        protected void initializeConsumers() throws JMSException {
285                // Register Sessions and MessageConsumers.
286                synchronized (this.consumersMonitor) {
287                        if (this.consumers == null) {
288                                this.sessions = new HashSet<>(this.concurrentConsumers);
289                                this.consumers = new HashSet<>(this.concurrentConsumers);
290                                Connection con = getSharedConnection();
291                                for (int i = 0; i < this.concurrentConsumers; i++) {
292                                        Session session = createSession(con);
293                                        MessageConsumer consumer = createListenerConsumer(session);
294                                        this.sessions.add(session);
295                                        this.consumers.add(consumer);
296                                }
297                        }
298                }
299        }
300
301        /**
302         * Create a MessageConsumer for the given JMS Session,
303         * registering a MessageListener for the specified listener.
304         * @param session the JMS Session to work on
305         * @return the MessageConsumer
306         * @throws JMSException if thrown by JMS methods
307         * @see #executeListener
308         */
309        protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
310                Destination destination = getDestination();
311                if (destination == null) {
312                        String destinationName = getDestinationName();
313                        Assert.state(destinationName != null, "No destination set");
314                        destination = resolveDestinationName(session, destinationName);
315                }
316                MessageConsumer consumer = createConsumer(session, destination);
317
318                if (this.taskExecutor != null) {
319                        consumer.setMessageListener(message -> this.taskExecutor.execute(() -> processMessage(message, session)));
320                }
321                else {
322                        consumer.setMessageListener(message -> processMessage(message, session));
323                }
324
325                return consumer;
326        }
327
328        /**
329         * Process a message received from the provider.
330         * <p>Executes the listener, exposing the current JMS Session as
331         * thread-bound resource (if "exposeListenerSession" is "true").
332         * @param message the received JMS Message
333         * @param session the JMS Session to operate on
334         * @see #executeListener
335         * @see #setExposeListenerSession
336         */
337        protected void processMessage(Message message, Session session) {
338                ConnectionFactory connectionFactory = getConnectionFactory();
339                boolean exposeResource = (connectionFactory != null && isExposeListenerSession());
340                if (exposeResource) {
341                        TransactionSynchronizationManager.bindResource(
342                                        connectionFactory, new LocallyExposedJmsResourceHolder(session));
343                }
344                try {
345                        executeListener(session, message);
346                }
347                finally {
348                        if (exposeResource) {
349                                TransactionSynchronizationManager.unbindResource(getConnectionFactory());
350                        }
351                }
352        }
353
354        /**
355         * Destroy the registered JMS Sessions and associated MessageConsumers.
356         */
357        @Override
358        protected void doShutdown() throws JMSException {
359                synchronized (this.consumersMonitor) {
360                        if (this.consumers != null) {
361                                logger.debug("Closing JMS MessageConsumers");
362                                for (MessageConsumer consumer : this.consumers) {
363                                        JmsUtils.closeMessageConsumer(consumer);
364                                }
365                                if (this.sessions != null) {
366                                        logger.debug("Closing JMS Sessions");
367                                        for (Session session : this.sessions) {
368                                                JmsUtils.closeSession(session);
369                                        }
370                                }
371                        }
372                }
373        }
374
375}