001/* 002 * Copyright 2002-2015 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.util.HashSet; 020import java.util.Set; 021import java.util.concurrent.Executor; 022import javax.jms.Connection; 023import javax.jms.Destination; 024import javax.jms.ExceptionListener; 025import javax.jms.JMSException; 026import javax.jms.Message; 027import javax.jms.MessageConsumer; 028import javax.jms.MessageListener; 029import javax.jms.Session; 030 031import org.springframework.jms.support.JmsUtils; 032import org.springframework.transaction.support.TransactionSynchronizationManager; 033import org.springframework.util.Assert; 034 035/** 036 * Message listener container that uses the plain JMS client API's 037 * {@code MessageConsumer.setMessageListener()} method to 038 * create concurrent MessageConsumers for the specified listeners. 039 * 040 * <p>This is the simplest form of a message listener container. 041 * It creates a fixed number of JMS Sessions to invoke the listener, 042 * not allowing for dynamic adaptation to runtime demands. Its main 043 * advantage is its low level of complexity and the minimum requirements 044 * on the JMS provider: Not even the ServerSessionPool facility is required. 045 * 046 * <p>See the {@link AbstractMessageListenerContainer} javadoc for details 047 * on acknowledge modes and transaction options. Note that this container 048 * exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode: 049 * that is, automatic message acknowledgment after listener execution, 050 * with no redelivery in case of a user exception thrown but potential 051 * redelivery in case of the JVM dying during listener execution. 052 * 053 * <p>For a different style of MessageListener handling, through looped 054 * {@code MessageConsumer.receive()} calls that also allow for 055 * transactional reception of messages (registering them with XA transactions), 056 * see {@link DefaultMessageListenerContainer}. 057 * 058 * @author Juergen Hoeller 059 * @since 2.0 060 * @see javax.jms.MessageConsumer#setMessageListener 061 * @see DefaultMessageListenerContainer 062 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager 063 */ 064public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener { 065 066 private boolean connectLazily = false; 067 068 private int concurrentConsumers = 1; 069 070 private Executor taskExecutor; 071 072 private Set<Session> sessions; 073 074 private Set<MessageConsumer> consumers; 075 076 private final Object consumersMonitor = new Object(); 077 078 079 /** 080 * Specify whether to connect lazily, i.e. whether to establish the JMS Connection 081 * and the corresponding Sessions and MessageConsumers as late as possible - 082 * in the start phase of this container. 083 * <p>Default is "false": connecting early, i.e. during the bean initialization phase. 084 * Set this flag to "true" in order to switch to lazy connecting if your target broker 085 * is likely to not have started up yet and you prefer to not even try a connection. 086 * @see #start() 087 * @see #initialize() 088 */ 089 public void setConnectLazily(boolean connectLazily) { 090 this.connectLazily = connectLazily; 091 } 092 093 /** 094 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple 095 * upper limit String, e.g. "10". 096 * <p>This listener container will always hold on to the maximum number of 097 * consumers {@link #setConcurrentConsumers} since it is unable to scale. 098 * <p>This property is primarily supported for configuration compatibility with 099 * {@link DefaultMessageListenerContainer}. For this local listener container, 100 * generally use {@link #setConcurrentConsumers} instead. 101 */ 102 @Override 103 public void setConcurrency(String concurrency) { 104 try { 105 int separatorIndex = concurrency.indexOf('-'); 106 if (separatorIndex != -1) { 107 setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1))); 108 } 109 else { 110 setConcurrentConsumers(Integer.parseInt(concurrency)); 111 } 112 } 113 catch (NumberFormatException ex) { 114 throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + 115 "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " + 116 "Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " + 117 "always keep a fixed number of consumers according to the maximum value."); 118 } 119 } 120 121 /** 122 * Specify the number of concurrent consumers to create. Default is 1. 123 * <p>Raising the number of concurrent consumers is recommendable in order 124 * to scale the consumption of messages coming in from a queue. However, 125 * note that any ordering guarantees are lost once multiple consumers are 126 * registered. In general, stick with 1 consumer for low-volume queues. 127 * <p><b>Do not raise the number of concurrent consumers for a topic.</b> 128 * This would lead to concurrent consumption of the same message, 129 * which is hardly ever desirable. 130 */ 131 public void setConcurrentConsumers(int concurrentConsumers) { 132 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); 133 this.concurrentConsumers = concurrentConsumers; 134 } 135 136 /** 137 * Set the Spring TaskExecutor to use for executing the listener once 138 * a message has been received by the provider. 139 * <p>Default is none, that is, to run in the JMS provider's own receive thread, 140 * blocking the provider's receive endpoint while executing the listener. 141 * <p>Specify a TaskExecutor for executing the listener in a different thread, 142 * rather than blocking the JMS provider, usually integrating with an existing 143 * thread pool. This allows to keep the number of concurrent consumers low (1) 144 * while still processing messages concurrently (decoupled from receiving!). 145 * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects 146 * acknowledgement semantics.</b> Messages will then always get acknowledged 147 * before listener execution, with the underlying Session immediately reused 148 * for receiving the next message. Using this in combination with a transacted 149 * session or with client acknowledgement will lead to unspecified results! 150 * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead 151 * to concurrent processing of messages that have been received by the same 152 * underlying Session.</b> As a consequence, it is not recommended to use 153 * this setting with a {@link SessionAwareMessageListener}, at least not 154 * if the latter performs actual work on the given Session. A standard 155 * {@link javax.jms.MessageListener} will work fine, in general. 156 * @see #setConcurrentConsumers 157 * @see org.springframework.core.task.SimpleAsyncTaskExecutor 158 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor 159 */ 160 public void setTaskExecutor(Executor taskExecutor) { 161 this.taskExecutor = taskExecutor; 162 } 163 164 @Override 165 protected void validateConfiguration() { 166 super.validateConfiguration(); 167 if (isSubscriptionDurable() && this.concurrentConsumers != 1) { 168 throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription"); 169 } 170 } 171 172 173 //------------------------------------------------------------------------- 174 // Implementation of AbstractMessageListenerContainer's template methods 175 //------------------------------------------------------------------------- 176 177 /** 178 * Always use a shared JMS Connection. 179 */ 180 @Override 181 protected final boolean sharedConnectionEnabled() { 182 return true; 183 } 184 185 /** 186 * Creates the specified number of concurrent consumers, 187 * in the form of a JMS Session plus associated MessageConsumer. 188 * @see #createListenerConsumer 189 */ 190 @Override 191 protected void doInitialize() throws JMSException { 192 if (!this.connectLazily) { 193 try { 194 establishSharedConnection(); 195 } 196 catch (JMSException ex) { 197 logger.debug("Could not connect on initialization - registering message consumers lazily", ex); 198 return; 199 } 200 initializeConsumers(); 201 } 202 } 203 204 /** 205 * Re-initializes this container's JMS message consumers, 206 * if not initialized already. 207 */ 208 @Override 209 protected void doStart() throws JMSException { 210 super.doStart(); 211 initializeConsumers(); 212 } 213 214 /** 215 * Registers this listener container as JMS ExceptionListener on the shared connection. 216 */ 217 @Override 218 protected void prepareSharedConnection(Connection connection) throws JMSException { 219 super.prepareSharedConnection(connection); 220 connection.setExceptionListener(this); 221 } 222 223 /** 224 * JMS ExceptionListener implementation, invoked by the JMS provider in 225 * case of connection failures. Re-initializes this listener container's 226 * shared connection and its sessions and consumers. 227 * @param ex the reported connection exception 228 */ 229 @Override 230 public void onException(JMSException ex) { 231 // First invoke the user-specific ExceptionListener, if any. 232 invokeExceptionListener(ex); 233 234 // Now try to recover the shared Connection and all consumers... 235 if (logger.isInfoEnabled()) { 236 logger.info("Trying to recover from JMS Connection exception: " + ex); 237 } 238 try { 239 synchronized (this.consumersMonitor) { 240 this.sessions = null; 241 this.consumers = null; 242 } 243 refreshSharedConnection(); 244 initializeConsumers(); 245 logger.info("Successfully refreshed JMS Connection"); 246 } 247 catch (JMSException recoverEx) { 248 logger.debug("Failed to recover JMS Connection", recoverEx); 249 logger.error("Encountered non-recoverable JMSException", ex); 250 } 251 } 252 253 /** 254 * Initialize the JMS Sessions and MessageConsumers for this container. 255 * @throws JMSException in case of setup failure 256 */ 257 protected void initializeConsumers() throws JMSException { 258 // Register Sessions and MessageConsumers. 259 synchronized (this.consumersMonitor) { 260 if (this.consumers == null) { 261 this.sessions = new HashSet<Session>(this.concurrentConsumers); 262 this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers); 263 Connection con = getSharedConnection(); 264 for (int i = 0; i < this.concurrentConsumers; i++) { 265 Session session = createSession(con); 266 MessageConsumer consumer = createListenerConsumer(session); 267 this.sessions.add(session); 268 this.consumers.add(consumer); 269 } 270 } 271 } 272 } 273 274 /** 275 * Create a MessageConsumer for the given JMS Session, 276 * registering a MessageListener for the specified listener. 277 * @param session the JMS Session to work on 278 * @return the MessageConsumer 279 * @throws JMSException if thrown by JMS methods 280 * @see #executeListener 281 */ 282 protected MessageConsumer createListenerConsumer(final Session session) throws JMSException { 283 Destination destination = getDestination(); 284 if (destination == null) { 285 destination = resolveDestinationName(session, getDestinationName()); 286 } 287 MessageConsumer consumer = createConsumer(session, destination); 288 289 if (this.taskExecutor != null) { 290 consumer.setMessageListener(new MessageListener() { 291 @Override 292 public void onMessage(final Message message) { 293 taskExecutor.execute(new Runnable() { 294 @Override 295 public void run() { 296 processMessage(message, session); 297 } 298 }); 299 } 300 }); 301 } 302 else { 303 consumer.setMessageListener(new MessageListener() { 304 @Override 305 public void onMessage(Message message) { 306 processMessage(message, session); 307 } 308 }); 309 } 310 311 return consumer; 312 } 313 314 /** 315 * Process a message received from the provider. 316 * <p>Executes the listener, exposing the current JMS Session as 317 * thread-bound resource (if "exposeListenerSession" is "true"). 318 * @param message the received JMS Message 319 * @param session the JMS Session to operate on 320 * @see #executeListener 321 * @see #setExposeListenerSession 322 */ 323 protected void processMessage(Message message, Session session) { 324 boolean exposeResource = isExposeListenerSession(); 325 if (exposeResource) { 326 TransactionSynchronizationManager.bindResource( 327 getConnectionFactory(), new LocallyExposedJmsResourceHolder(session)); 328 } 329 try { 330 executeListener(session, message); 331 } 332 finally { 333 if (exposeResource) { 334 TransactionSynchronizationManager.unbindResource(getConnectionFactory()); 335 } 336 } 337 } 338 339 /** 340 * Destroy the registered JMS Sessions and associated MessageConsumers. 341 */ 342 @Override 343 protected void doShutdown() throws JMSException { 344 synchronized (this.consumersMonitor) { 345 if (this.consumers != null) { 346 logger.debug("Closing JMS MessageConsumers"); 347 for (MessageConsumer consumer : this.consumers) { 348 JmsUtils.closeMessageConsumer(consumer); 349 } 350 logger.debug("Closing JMS Sessions"); 351 for (Session session : this.sessions) { 352 JmsUtils.closeSession(session); 353 } 354 } 355 } 356 } 357 358}