001/* 002 * Copyright 2002-2020 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.jms.listener; 018 019import java.util.HashSet; 020import java.util.Set; 021import java.util.concurrent.Executor; 022 023import javax.jms.Connection; 024import javax.jms.ConnectionFactory; 025import javax.jms.Destination; 026import javax.jms.ExceptionListener; 027import javax.jms.JMSException; 028import javax.jms.Message; 029import javax.jms.MessageConsumer; 030import javax.jms.Session; 031 032import org.springframework.jms.support.JmsUtils; 033import org.springframework.lang.Nullable; 034import org.springframework.transaction.support.TransactionSynchronizationManager; 035import org.springframework.util.Assert; 036 037/** 038 * Message listener container that uses the plain JMS client API's 039 * {@code MessageConsumer.setMessageListener()} method to 040 * create concurrent MessageConsumers for the specified listeners. 041 * 042 * <p>This is the simplest form of a message listener container. 043 * It creates a fixed number of JMS Sessions to invoke the listener, 044 * not allowing for dynamic adaptation to runtime demands. Its main 045 * advantage is its low level of complexity and the minimum requirements 046 * on the JMS provider: Not even the ServerSessionPool facility is required. 047 * 048 * <p>See the {@link AbstractMessageListenerContainer} javadoc for details 049 * on acknowledge modes and transaction options. Note that this container 050 * exposes standard JMS behavior for the default "AUTO_ACKNOWLEDGE" mode: 051 * that is, automatic message acknowledgment after listener execution, 052 * with no redelivery in case of a user exception thrown but potential 053 * redelivery in case of the JVM dying during listener execution. 054 * 055 * <p>For a different style of MessageListener handling, through looped 056 * {@code MessageConsumer.receive()} calls that also allow for 057 * transactional reception of messages (registering them with XA transactions), 058 * see {@link DefaultMessageListenerContainer}. 059 * 060 * @author Juergen Hoeller 061 * @since 2.0 062 * @see javax.jms.MessageConsumer#setMessageListener 063 * @see DefaultMessageListenerContainer 064 * @see org.springframework.jms.listener.endpoint.JmsMessageEndpointManager 065 */ 066public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer implements ExceptionListener { 067 068 private boolean connectLazily = false; 069 070 private boolean recoverOnException = true; 071 072 private int concurrentConsumers = 1; 073 074 @Nullable 075 private Executor taskExecutor; 076 077 @Nullable 078 private Set<Session> sessions; 079 080 @Nullable 081 private Set<MessageConsumer> consumers; 082 083 private final Object consumersMonitor = new Object(); 084 085 086 /** 087 * Specify whether to connect lazily, i.e. whether to establish the JMS Connection 088 * and the corresponding Sessions and MessageConsumers as late as possible - 089 * in the start phase of this container. 090 * <p>Default is "false": connecting early, i.e. during the bean initialization phase. 091 * Set this flag to "true" in order to switch to lazy connecting if your target broker 092 * is likely to not have started up yet and you prefer to not even try a connection. 093 * @see #start() 094 * @see #initialize() 095 */ 096 public void setConnectLazily(boolean connectLazily) { 097 this.connectLazily = connectLazily; 098 } 099 100 /** 101 * Specify whether to explicitly recover the shared JMS Connection and the 102 * associated Sessions and MessageConsumers whenever a JMSException is reported. 103 * <p>Default is "true": refreshing the shared connection and re-initializing the 104 * consumers whenever the connection propagates an exception to its listener. 105 * Set this flag to "false" in order to rely on automatic recovery within the 106 * provider, holding on to the existing connection and consumer handles. 107 * @since 5.1.8 108 * @see #onException(JMSException) 109 * @see Connection#setExceptionListener 110 */ 111 public void setRecoverOnException(boolean recoverOnException) { 112 this.recoverOnException = recoverOnException; 113 } 114 115 /** 116 * Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple 117 * upper limit String, e.g. "10". 118 * <p>This listener container will always hold on to the maximum number of 119 * consumers {@link #setConcurrentConsumers} since it is unable to scale. 120 * <p>This property is primarily supported for configuration compatibility with 121 * {@link DefaultMessageListenerContainer}. For this local listener container, 122 * generally use {@link #setConcurrentConsumers} instead. 123 */ 124 @Override 125 public void setConcurrency(String concurrency) { 126 try { 127 int separatorIndex = concurrency.indexOf('-'); 128 if (separatorIndex != -1) { 129 setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1))); 130 } 131 else { 132 setConcurrentConsumers(Integer.parseInt(concurrency)); 133 } 134 } 135 catch (NumberFormatException ex) { 136 throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " + 137 "single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " + 138 "Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " + 139 "always keep a fixed number of consumers according to the maximum value."); 140 } 141 } 142 143 /** 144 * Specify the number of concurrent consumers to create. Default is 1. 145 * <p>Raising the number of concurrent consumers is recommendable in order 146 * to scale the consumption of messages coming in from a queue. However, 147 * note that any ordering guarantees are lost once multiple consumers are 148 * registered. In general, stick with 1 consumer for low-volume queues. 149 * <p><b>Do not raise the number of concurrent consumers for a topic.</b> 150 * This would lead to concurrent consumption of the same message, 151 * which is hardly ever desirable. 152 */ 153 public void setConcurrentConsumers(int concurrentConsumers) { 154 Assert.isTrue(concurrentConsumers > 0, "'concurrentConsumers' value must be at least 1 (one)"); 155 this.concurrentConsumers = concurrentConsumers; 156 } 157 158 /** 159 * Set the Spring TaskExecutor to use for executing the listener once 160 * a message has been received by the provider. 161 * <p>Default is none, that is, to run in the JMS provider's own receive thread, 162 * blocking the provider's receive endpoint while executing the listener. 163 * <p>Specify a TaskExecutor for executing the listener in a different thread, 164 * rather than blocking the JMS provider, usually integrating with an existing 165 * thread pool. This allows to keep the number of concurrent consumers low (1) 166 * while still processing messages concurrently (decoupled from receiving!). 167 * <p><b>NOTE: Specifying a TaskExecutor for listener execution affects 168 * acknowledgement semantics.</b> Messages will then always get acknowledged 169 * before listener execution, with the underlying Session immediately reused 170 * for receiving the next message. Using this in combination with a transacted 171 * session or with client acknowledgement will lead to unspecified results! 172 * <p><b>NOTE: Concurrent listener execution via a TaskExecutor will lead 173 * to concurrent processing of messages that have been received by the same 174 * underlying Session.</b> As a consequence, it is not recommended to use 175 * this setting with a {@link SessionAwareMessageListener}, at least not 176 * if the latter performs actual work on the given Session. A standard 177 * {@link javax.jms.MessageListener} will work fine, in general. 178 * @see #setConcurrentConsumers 179 * @see org.springframework.core.task.SimpleAsyncTaskExecutor 180 * @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor 181 */ 182 public void setTaskExecutor(Executor taskExecutor) { 183 this.taskExecutor = taskExecutor; 184 } 185 186 @Override 187 protected void validateConfiguration() { 188 super.validateConfiguration(); 189 if (isSubscriptionDurable() && this.concurrentConsumers != 1) { 190 throw new IllegalArgumentException("Only 1 concurrent consumer supported for durable subscription"); 191 } 192 } 193 194 195 //------------------------------------------------------------------------- 196 // Implementation of AbstractMessageListenerContainer's template methods 197 //------------------------------------------------------------------------- 198 199 /** 200 * Always use a shared JMS Connection. 201 */ 202 @Override 203 protected final boolean sharedConnectionEnabled() { 204 return true; 205 } 206 207 /** 208 * Creates the specified number of concurrent consumers, 209 * in the form of a JMS Session plus associated MessageConsumer. 210 * @see #createListenerConsumer 211 */ 212 @Override 213 protected void doInitialize() throws JMSException { 214 if (!this.connectLazily) { 215 try { 216 establishSharedConnection(); 217 } 218 catch (JMSException ex) { 219 logger.debug("Could not connect on initialization - registering message consumers lazily", ex); 220 return; 221 } 222 initializeConsumers(); 223 } 224 } 225 226 /** 227 * Re-initializes this container's JMS message consumers, 228 * if not initialized already. 229 */ 230 @Override 231 protected void doStart() throws JMSException { 232 super.doStart(); 233 initializeConsumers(); 234 } 235 236 /** 237 * Registers this listener container as JMS ExceptionListener on the shared connection. 238 */ 239 @Override 240 protected void prepareSharedConnection(Connection connection) throws JMSException { 241 super.prepareSharedConnection(connection); 242 connection.setExceptionListener(this); 243 } 244 245 /** 246 * JMS ExceptionListener implementation, invoked by the JMS provider in 247 * case of connection failures. Re-initializes this listener container's 248 * shared connection and its sessions and consumers, if necessary. 249 * @param ex the reported connection exception 250 * @see #setRecoverOnException 251 * @see #refreshSharedConnection() 252 * @see #initializeConsumers() 253 */ 254 @Override 255 public void onException(JMSException ex) { 256 // First invoke the user-specific ExceptionListener, if any. 257 invokeExceptionListener(ex); 258 259 // Now try to recover the shared Connection and all consumers... 260 if (this.recoverOnException) { 261 if (logger.isDebugEnabled()) { 262 logger.debug("Trying to recover from JMS Connection exception: " + ex); 263 } 264 try { 265 synchronized (this.consumersMonitor) { 266 this.sessions = null; 267 this.consumers = null; 268 } 269 refreshSharedConnection(); 270 initializeConsumers(); 271 logger.debug("Successfully refreshed JMS Connection"); 272 } 273 catch (JMSException recoverEx) { 274 logger.debug("Failed to recover JMS Connection", recoverEx); 275 logger.error("Encountered non-recoverable JMSException", ex); 276 } 277 } 278 } 279 280 /** 281 * Initialize the JMS Sessions and MessageConsumers for this container. 282 * @throws JMSException in case of setup failure 283 */ 284 protected void initializeConsumers() throws JMSException { 285 // Register Sessions and MessageConsumers. 286 synchronized (this.consumersMonitor) { 287 if (this.consumers == null) { 288 this.sessions = new HashSet<>(this.concurrentConsumers); 289 this.consumers = new HashSet<>(this.concurrentConsumers); 290 Connection con = getSharedConnection(); 291 for (int i = 0; i < this.concurrentConsumers; i++) { 292 Session session = createSession(con); 293 MessageConsumer consumer = createListenerConsumer(session); 294 this.sessions.add(session); 295 this.consumers.add(consumer); 296 } 297 } 298 } 299 } 300 301 /** 302 * Create a MessageConsumer for the given JMS Session, 303 * registering a MessageListener for the specified listener. 304 * @param session the JMS Session to work on 305 * @return the MessageConsumer 306 * @throws JMSException if thrown by JMS methods 307 * @see #executeListener 308 */ 309 protected MessageConsumer createListenerConsumer(final Session session) throws JMSException { 310 Destination destination = getDestination(); 311 if (destination == null) { 312 String destinationName = getDestinationName(); 313 Assert.state(destinationName != null, "No destination set"); 314 destination = resolveDestinationName(session, destinationName); 315 } 316 MessageConsumer consumer = createConsumer(session, destination); 317 318 if (this.taskExecutor != null) { 319 consumer.setMessageListener(message -> this.taskExecutor.execute(() -> processMessage(message, session))); 320 } 321 else { 322 consumer.setMessageListener(message -> processMessage(message, session)); 323 } 324 325 return consumer; 326 } 327 328 /** 329 * Process a message received from the provider. 330 * <p>Executes the listener, exposing the current JMS Session as 331 * thread-bound resource (if "exposeListenerSession" is "true"). 332 * @param message the received JMS Message 333 * @param session the JMS Session to operate on 334 * @see #executeListener 335 * @see #setExposeListenerSession 336 */ 337 protected void processMessage(Message message, Session session) { 338 ConnectionFactory connectionFactory = getConnectionFactory(); 339 boolean exposeResource = (connectionFactory != null && isExposeListenerSession()); 340 if (exposeResource) { 341 TransactionSynchronizationManager.bindResource( 342 connectionFactory, new LocallyExposedJmsResourceHolder(session)); 343 } 344 try { 345 executeListener(session, message); 346 } 347 finally { 348 if (exposeResource) { 349 TransactionSynchronizationManager.unbindResource(getConnectionFactory()); 350 } 351 } 352 } 353 354 /** 355 * Destroy the registered JMS Sessions and associated MessageConsumers. 356 */ 357 @Override 358 protected void doShutdown() throws JMSException { 359 synchronized (this.consumersMonitor) { 360 if (this.consumers != null) { 361 logger.debug("Closing JMS MessageConsumers"); 362 for (MessageConsumer consumer : this.consumers) { 363 JmsUtils.closeMessageConsumer(consumer); 364 } 365 if (this.sessions != null) { 366 logger.debug("Closing JMS Sessions"); 367 for (Session session : this.sessions) { 368 JmsUtils.closeSession(session); 369 } 370 } 371 } 372 } 373 } 374 375}