001/*
002 * Copyright 2002-2015 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.jms.listener;
018
019import java.util.HashSet;
020import java.util.Set;
021import java.util.concurrent.Executor;
022import javax.jms.Connection;
023import javax.jms.Destination;
024import javax.jms.ExceptionListener;
025import javax.jms.JMSException;
026import javax.jms.Message;
027import javax.jms.MessageConsumer;
028import javax.jms.MessageListener;
029import javax.jms.Session;
030
031import org.springframework.jms.support.JmsUtils;
032import org.springframework.transaction.support.TransactionSynchronizationManager;
033import org.springframework.util.Assert;
034
035/**
036 * Message listener container that uses the plain JMS client API's
037 * {@code MessageConsumer.setMessageListener()} method to
038 * create concurrent MessageConsumers for the specified listeners.
039 *
040 * <p>This is the simplest form of a message listener container.
041 * It creates a fixed number of JMS Sessions to invoke the listener,
042 * not allowing for dynamic adaptation to runtime demands. Its main
043 * advantage is its low level of complexity and the minimum requirements
044 * on the JMS provider: Not even the ServerSessionPool facility is required.
045 *
046 * <p>See the {@link AbstractMessageListenerContainer} javadoc for details
047 * on acknowledge modes and transaction options. Note that this container
048 * exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode:
049 * that is, automatic message acknowledgment after listener execution,
050 * with no redelivery in case of a user exception thrown but potential
051 * redelivery in case of the JVM dying during listener execution.
052 *
053 * <p>For a different style of MessageListener handling, through looped
054 * {@code MessageConsumer.receive()} calls that also allow for
055 * transactional reception of messages (registering them with XA transactions),
056 * see {@link DefaultMessageListenerContainer}.
057 *
058 * @author Juergen Hoeller
059 * @since 2.0
060 * @see javax.jms.MessageConsumer#setMessageListener
061 * @see DefaultMessageListenerContainer
062 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager
063 */
064public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener {
065
066        private boolean connectLazily = false;
067
068        private int concurrentConsumers = 1;
069
070        private Executor taskExecutor;
071
072        private Set<Session> sessions;
073
074        private Set<MessageConsumer> consumers;
075
076        private final Object consumersMonitor = new Object();
077
078
079        /**
080         * Specify whether to connect lazily, i.e. whether to establish the JMS Connection
081         * and the corresponding Sessions and MessageConsumers as late as possible -
082         * in the start phase of this container.
083         * <p>Default is "false": connecting early, i.e. during the bean initialization phase.
084         * Set this flag to "true" in order to switch to lazy connecting if your target broker
085         * is likely to not have started up yet and you prefer to not even try a connection.
086         * @see #start()
087         * @see #initialize()
088         */
089        public void setConnectLazily(boolean connectLazily) {
090                this.connectLazily = connectLazily;
091        }
092
093        /**
094         * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
095         * upper limit String, e.g. "10".
096         * <p>This listener container will always hold on to the maximum number of
097         * consumers {@link #setConcurrentConsumers} since it is unable to scale.
098         * <p>This property is primarily supported for configuration compatibility with
099         * {@link DefaultMessageListenerContainer}. For this local listener container,
100         * generally use {@link #setConcurrentConsumers} instead.
101         */
102        @Override
103        public void setConcurrency(String concurrency) {
104                try {
105                        int separatorIndex = concurrency.indexOf('-');
106                        if (separatorIndex != -1) {
107                                setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
108                        }
109                        else {
110                                setConcurrentConsumers(Integer.parseInt(concurrency));
111                        }
112                }
113                catch (NumberFormatException ex) {
114                        throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
115                                        "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
116                                        "Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " +
117                                        "always keep a fixed number of consumers according to the maximum value.");
118                }
119        }
120
121        /**
122         * Specify the number of concurrent consumers to create. Default is 1.
123         * <p>Raising the number of concurrent consumers is recommendable in order
124         * to scale the consumption of messages coming in from a queue. However,
125         * note that any ordering guarantees are lost once multiple consumers are
126         * registered. In general, stick with 1 consumer for low-volume queues.
127         * <p><b>Do not raise the number of concurrent consumers for a topic.</b>
128         * This would lead to concurrent consumption of the same message,
129         * which is hardly ever desirable.
130         */
131        public void setConcurrentConsumers(int concurrentConsumers) {
132                Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)");
133                this.concurrentConsumers = concurrentConsumers;
134        }
135
136        /**
137         * Set the Spring TaskExecutor to use for executing the listener once
138         * a message has been received by the provider.
139         * <p>Default is none, that is, to run in the JMS provider's own receive thread,
140         * blocking the provider's receive endpoint while executing the listener.
141         * <p>Specify a TaskExecutor for executing the listener in a different thread,
142         * rather than blocking the JMS provider, usually integrating with an existing
143         * thread pool. This allows to keep the number of concurrent consumers low (1)
144         * while still processing messages concurrently (decoupled from receiving!).
145         * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects
146         * acknowledgement semantics.</b> Messages will then always get acknowledged
147         * before listener execution, with the underlying Session immediately reused
148         * for receiving the next message. Using this in combination with a transacted
149         * session or with client acknowledgement will lead to unspecified results!
150         * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead
151         * to concurrent processing of messages that have been received by the same
152         * underlying Session.</b> As a consequence, it is not recommended to use
153         * this setting with a {@link SessionAwareMessageListener}, at least not
154         * if the latter performs actual work on the given Session. A standard
155         * {@link javax.jms.MessageListener} will work fine, in general.
156         * @see #setConcurrentConsumers
157         * @see org.springframework.core.task.SimpleAsyncTaskExecutor
158         * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
159         */
160        public void setTaskExecutor(Executor taskExecutor) {
161                this.taskExecutor = taskExecutor;
162        }
163
164        @Override
165        protected void validateConfiguration() {
166                super.validateConfiguration();
167                if (isSubscriptionDurable() && this.concurrentConsumers != 1) {
168                        throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription");
169                }
170        }
171
172
173        //-------------------------------------------------------------------------
174        // Implementation of AbstractMessageListenerContainer's template methods
175        //-------------------------------------------------------------------------
176
177        /**
178         * Always use a shared JMS Connection.
179         */
180        @Override
181        protected final boolean sharedConnectionEnabled() {
182                return true;
183        }
184
185        /**
186         * Creates the specified number of concurrent consumers,
187         * in the form of a JMS Session plus associated MessageConsumer.
188         * @see #createListenerConsumer
189         */
190        @Override
191        protected void doInitialize() throws JMSException {
192                if (!this.connectLazily) {
193                        try {
194                                establishSharedConnection();
195                        }
196                        catch (JMSException ex) {
197                                logger.debug("Could not connect on initialization - registering message consumers lazily", ex);
198                                return;
199                        }
200                        initializeConsumers();
201                }
202        }
203
204        /**
205         * Re-initializes this container's JMS message consumers,
206         * if not initialized already.
207         */
208        @Override
209        protected void doStart() throws JMSException {
210                super.doStart();
211                initializeConsumers();
212        }
213
214        /**
215         * Registers this listener container as JMS ExceptionListener on the shared connection.
216         */
217        @Override
218        protected void prepareSharedConnection(Connection connection) throws JMSException {
219                super.prepareSharedConnection(connection);
220                connection.setExceptionListener(this);
221        }
222
223        /**
224         * JMS ExceptionListener implementation, invoked by the JMS provider in
225         * case of connection failures. Re-initializes this listener container's
226         * shared connection and its sessions and consumers.
227         * @param ex the reported connection exception
228         */
229        @Override
230        public void onException(JMSException ex) {
231                // First invoke the user-specific ExceptionListener, if any.
232                invokeExceptionListener(ex);
233
234                // Now try to recover the shared Connection and all consumers...
235                if (logger.isInfoEnabled()) {
236                        logger.info("Trying to recover from JMS Connection exception: " + ex);
237                }
238                try {
239                        synchronized (this.consumersMonitor) {
240                                this.sessions = null;
241                                this.consumers = null;
242                        }
243                        refreshSharedConnection();
244                        initializeConsumers();
245                        logger.info("Successfully refreshed JMS Connection");
246                }
247                catch (JMSException recoverEx) {
248                        logger.debug("Failed to recover JMS Connection", recoverEx);
249                        logger.error("Encountered non-recoverable JMSException", ex);
250                }
251        }
252
253        /**
254         * Initialize the JMS Sessions and MessageConsumers for this container.
255         * @throws JMSException in case of setup failure
256         */
257        protected void initializeConsumers() throws JMSException {
258                // Register Sessions and MessageConsumers.
259                synchronized (this.consumersMonitor) {
260                        if (this.consumers == null) {
261                                this.sessions = new HashSet<Session>(this.concurrentConsumers);
262                                this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);
263                                Connection con = getSharedConnection();
264                                for (int i = 0; i < this.concurrentConsumers; i++) {
265                                        Session session = createSession(con);
266                                        MessageConsumer consumer = createListenerConsumer(session);
267                                        this.sessions.add(session);
268                                        this.consumers.add(consumer);
269                                }
270                        }
271                }
272        }
273
274        /**
275         * Create a MessageConsumer for the given JMS Session,
276         * registering a MessageListener for the specified listener.
277         * @param session the JMS Session to work on
278         * @return the MessageConsumer
279         * @throws JMSException if thrown by JMS methods
280         * @see #executeListener
281         */
282        protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
283                Destination destination = getDestination();
284                if (destination == null) {
285                        destination = resolveDestinationName(session, getDestinationName());
286                }
287                MessageConsumer consumer = createConsumer(session, destination);
288
289                if (this.taskExecutor != null) {
290                        consumer.setMessageListener(new MessageListener() {
291                                @Override
292                                public void onMessage(final Message message) {
293                                        taskExecutor.execute(new Runnable() {
294                                                @Override
295                                                public void run() {
296                                                        processMessage(message, session);
297                                                }
298                                        });
299                                }
300                        });
301                }
302                else {
303                        consumer.setMessageListener(new MessageListener() {
304                                @Override
305                                public void onMessage(Message message) {
306                                        processMessage(message, session);
307                                }
308                        });
309                }
310
311                return consumer;
312        }
313
314        /**
315         * Process a message received from the provider.
316         * <p>Executes the listener, exposing the current JMS Session as
317         * thread-bound resource (if "exposeListenerSession" is "true").
318         * @param message the received JMS Message
319         * @param session the JMS Session to operate on
320         * @see #executeListener
321         * @see #setExposeListenerSession
322         */
323        protected void processMessage(Message message, Session session) {
324                boolean exposeResource = isExposeListenerSession();
325                if (exposeResource) {
326                        TransactionSynchronizationManager.bindResource(
327                                        getConnectionFactory(), new LocallyExposedJmsResourceHolder(session));
328                }
329                try {
330                        executeListener(session, message);
331                }
332                finally {
333                        if (exposeResource) {
334                                TransactionSynchronizationManager.unbindResource(getConnectionFactory());
335                        }
336                }
337        }
338
339        /**
340         * Destroy the registered JMS Sessions and associated MessageConsumers.
341         */
342        @Override
343        protected void doShutdown() throws JMSException {
344                synchronized (this.consumersMonitor) {
345                        if (this.consumers != null) {
346                                logger.debug("Closing JMS MessageConsumers");
347                                for (MessageConsumer consumer : this.consumers) {
348                                        JmsUtils.closeMessageConsumer(consumer);
349                                }
350                                logger.debug("Closing JMS Sessions");
351                                for (Session session : this.sessions) {
352                                        JmsUtils.closeSession(session);
353                                }
354                        }
355                }
356        }
357
358}