3. Reference
This part of the reference documentation details the various components that comprise Spring AMQP. The main chapter covers the core classes to develop an AMQP application. This part also includes a chapter about the sample applications.
In this chapter, we will explore the interfaces and classes that are the essential components for developing applications with Spring AMQP.
Spring AMQP consists of a handful of modules, each represented by a JAR in the distribution. These modules are: spring-amqp, and spring-rabbit. The spring-amqp module contains the org.springframework.amqp.core
package. Within that package, you will find the classes that represent the core AMQP "model". Our intention is to provide generic abstractions that do not rely on any particular AMQP broker implementation or client library. End user code will be more portable across vendor implementations as it can be developed against the abstraction layer only. These abstractions are then used implemented by broker-specific modules, such as spring-rabbit. There is currently only a RabbitMQ implementation; however the abstractions have been validated in .NET using Apache Qpid in addition to RabbitMQ. Since AMQP operates at the protocol level in principle, the RabbitMQ client can be used with any broker that supports the same protocol version, but we do not test any other brokers at present.
The overview here assumes that you are already familiar with the basics of the AMQP specification. If you are not, then have a look at the resources listed in Chapter 5, Other Resources
The 0-9-1 AMQP specification does not define a Message class or interface. Instead, when performing an operation such as basicPublish()
, the content is passed as a byte-array argument and additional properties are passed in as separate arguments. Spring AMQP defines a Message class as part of a more general AMQP domain model representation. The purpose of the Message class is to simply encapsulate the body and properties within a single instance so that the API can in turn be simpler. The Message class definition is quite straightforward.
public class Message { private final MessageProperties messageProperties; private final byte[] body; public Message(byte[] body, MessageProperties messageProperties) { this.body = body; this.messageProperties = messageProperties; } public byte[] getBody() { return this.body; } public MessageProperties getMessageProperties() { return this.messageProperties; } }
The MessageProperties
interface defines several common properties such as messageId, timestamp, contentType, and several more. Those properties can also be extended with user-defined headers by calling the setHeader(String key, Object value)
method.
Important | |
---|---|
Starting with versions |
The Exchange
interface represents an AMQP Exchange, which is what a Message Producer sends to. Each Exchange within a virtual host of a broker will have a unique name as well as a few other properties:
public interface Exchange { String getName(); String getExchangeType(); boolean isDurable(); boolean isAutoDelete(); Map<String, Object> getArguments(); }
As you can see, an Exchange also has a type represented by constants defined in ExchangeTypes
. The basic types are: Direct
, Topic
, Fanout
, and Headers
. In the core package you will find implementations of the Exchange
interface for each of those types. The behavior varies across these Exchange types in terms of how they handle bindings to Queues. For example, a Direct exchange allows for a Queue to be bound by a fixed routing key (often the Queue’s name). A Topic exchange supports bindings with routing patterns that may include the * and # wildcards for exactly-one and zero-or-more, respectively. The Fanout exchange publishes to all Queues that are bound to it without taking any routing key into consideration. For much more information about these and the other Exchange types, check out Chapter 5, Other Resources.
Note | |
---|---|
The AMQP specification also requires that any broker provide a "default" Direct Exchange that has no name. All Queues that are declared will be bound to that default Exchange with their names as routing keys. You will learn more about the default Exchange’s usage within Spring AMQP in Section 3.1.4, “AmqpTemplate”. |
The Queue
class represents the component from which a Message Consumer receives Messages. Like the various Exchange classes, our implementation is intended to be an abstract representation of this core AMQP type.
public class Queue { private final String name; private volatile boolean durable; private volatile boolean exclusive; private volatile boolean autoDelete; private volatile Map<String, Object> arguments; /** * The queue is durable, non-exclusive and non auto-delete. * * @param name the name of the queue. */ public Queue(String name) { this(name, true, false, false); } // Getters and Setters omitted for brevity }
Notice that the constructor takes the Queue name. Depending on the implementation, the admin template may provide methods for generating a uniquely named Queue. Such Queues can be useful as a "reply-to" address or other temporary situations. For that reason, the exclusive and autoDelete properties of an auto-generated Queue would both be set to true.
Note | |
---|---|
See the section on queues in Section 3.1.11, “Configuring the broker” for information about declaring queues using namespace support, including queue arguments. |
Given that a producer sends to an Exchange and a consumer receives from a Queue, the bindings that connect Queues to Exchanges are critical for connecting those producers and consumers via messaging. In Spring AMQP, we define a Binding
class to represent those connections. Let’s review the basic options for binding Queues to Exchanges.
You can bind a Queue to a DirectExchange with a fixed routing key.
new Binding(someQueue, someDirectExchange, "foo.bar")
You can bind a Queue to a TopicExchange with a routing pattern.
new Binding(someQueue, someTopicExchange, "foo.*")
You can bind a Queue to a FanoutExchange with no routing key.
new Binding(someQueue, someFanoutExchange)
We also provide a BindingBuilder
to facilitate a "fluent API" style.
Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
Note | |
---|---|
The BindingBuilder class is shown above for clarity, but this style works well when using a static import for the bind() method. |
By itself, an instance of the Binding class is just holding the data about a connection. In other words, it is not an "active" component. However, as you will see later in Section 3.1.11, “Configuring the broker”, Binding instances can be used by the AmqpAdmin
class to actually trigger the binding actions on the broker. Also, as you will see in that same section, the Binding instances can be defined using Spring’s @Bean
-style within @Configuration
classes. There is also a convenient base class which further simplifies that approach for generating AMQP-related bean definitions and recognizes the Queues, Exchanges, and Bindings so that they will all be declared on the AMQP broker upon application startup.
The AmqpTemplate
is also defined within the core package. As one of the main components involved in actual AMQP messaging, it is discussed in detail in its own section (see Section 3.1.4, “AmqpTemplate”).
Whereas the AMQP model we described in the previous section is generic and applicable to all implementations, when we get into the management of resources, the details are specific to the broker implementation. Therefore, in this section, we will be focusing on code that exists only within our "spring-rabbit" module since at this point, RabbitMQ is the only supported implementation.
The central component for managing a connection to the RabbitMQ broker is the ConnectionFactory
interface. The responsibility of a ConnectionFactory
implementation is to provide an instance of org.springframework.amqp.rabbit.connection.Connection
which is a wrapper for com.rabbitmq.client.Connection
. The only concrete implementation we provide is CachingConnectionFactory
which, by default, establishes a single connection proxy that can be shared by the application. Sharing of the connection is possible since the "unit of work" for messaging with AMQP is actually a "channel" (in some ways, this is similar to the relationship between a Connection and a Session in JMS). As you can imagine, the connection instance provides a createChannel
method. The CachingConnectionFactory
implementation supports caching of those channels, and it maintains separate caches for channels based on whether they are transactional or not. When creating an instance of CachingConnectionFactory
, the hostname can be provided via the constructor. The username and password properties should be provided as well. If you would like to configure the size of the channel cache (the default is 25), you could call the setChannelCacheSize()
method here as well.
Starting with version 1.3, the CachingConnectionFactory
can be configured to cache connections as well as just channels. In this case, each call to createConnection()
creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache (if the cache size has not been reached). Channels created on such connections are cached too. The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members. Set the cacheMode
to CacheMode.CONNECTION
.
Note | |
---|---|
This does not limit the number of connections, it specifies how many idle open connections are allowed. |
Starting with version 1.5.5, a new property connectionLimit
is provided. When this is set, it limits the total number of connections allowed. When set, if the limit is reached, the channelCheckoutTimeLimit
is used to wait for a connection to become idle. If the time is exceeded, an AmqpTimeoutException
is thrown.
Important | |
---|---|
When the cache mode is Also, at the time of writing, the |
It is important to understand that the cache size is (by default) not a limit, but merely the number of channels that can be cached. With a cache size of, say, 10, any number of channels can actually be in use. If more than 10 channels are being used and they are all returned to the cache, 10 will go in the cache; the remainder will be physically closed.
Starting with version 1.6, the default channel cache size has been increased from 1 to 25. In high volume, multi-threaded, environments, a small cache means that channels are created and closed at a high rate. Increasing the default cache size will avoid this overhead. You should monitor the channels in use via the RabbitMQ Admin UI and consider increasing the cache size further if you see many channels being created and closed. The cache will only grow on-demand (to suit the concurrency requirements of the application) so this change will not impact existing low-volume applications.
Starting with version 1.4.2, the CachingConnectionFactory
has a property channelCheckoutTimeout
. When this property is greater than zero, the channelCacheSize
becomes a limit on the number of channels that can be created on a connection. If the limit is reached, calling threads will block until a channel is available or this timeout is reached, in which case a AmqpTimeoutException
is thrown.
Warning | |
---|---|
Channels used within the framework (e.g. |
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); Connection connection = connectionFactory.createConnection();
When using XML, the configuration might look like this:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="somehost"/> <property name="username" value="guest"/> <property name="password" value="guest"/> </bean>
Note | |
---|---|
There is also a |
A ConnectionFactory
can be created quickly and conveniently using the rabbit namespace:
<rabbit:connection-factory id="connectionFactory"/>
In most cases this will be preferable since the framework can choose the best defaults for you. The created instance will be a CachingConnectionFactory
. Keep in mind that the default cache size for channels is 25. If you want more channels to be cached set a larger value via the channelCacheSize property. In XML it would look like this:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> <constructor-arg value="somehost"/> <property name="username" value="guest"/> <property name="password" value="guest"/> <property name="channelCacheSize" value="50"/> </bean>
And with the namespace you can just add the channel-cache-size attribute:
<rabbit:connection-factory id="connectionFactory" channel-cache-size="50"/>
The default cache mode is CHANNEL, but you can configure it to cache connections instead; in this case, we use connection-cache-size
:
<rabbit:connection-factory id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
Host and port attributes can be provided using the namespace
<rabbit:connection-factory id="connectionFactory" host="somehost" port="5672"/>
Alternatively, if running in a clustered environment, use the addresses attribute.
<rabbit:connection-factory id="connectionFactory" addresses="host1:5672,host2:5672"/>
Here’s an example with a custom thread factory that prefixes thread names with rabbitmq-
.
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567" thread-factory="tf" channel-cache-size="10" username="user" password="password" /> <bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory"> <constructor-arg value="rabbitmq-" /> </bean>
Starting with version 1.7 a ConnectionNameStrategy
is provided for the injection into the AbstractionConnectionFactory
. The generated name is used for the application-specific identification of the target RabbitMQ connection. The connection name is displayed in the management UI if the RabbitMQ server supports it. This value doesn’t have to be unique and cannot be used as a connection identifier e.g. in HTTP API requests. This value is supposed to be human-readable and is a part of ClientProperties
under connection_name
key. A simple Lambda can be used:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
The ConnectionFactory
argument can be used to distinguish target connection names by some logic. By default, the beanName
of the AbstractConnectionFactory
, a hex String representing the object, and an internal counter are used to generate the connection_name
. The <rabbit:connection-factory>
namespace component is also supplied with the connection-name-strategy
attribute.
An implementation SimplePropertyValueConnectionNameStrategy
is provided that sets the connection name to an application property. Declare it as a @Bean
and inject it into the connection factory:
@Bean public ConnectionNameStrategy cns() { return new SimplePropertyValueConnectionNameStrategy("spring.application.name"); } @Bean public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); ... connectionFactory.setConnectionNameStrategy(cns); return connectionFactory; }
The property must exist in the application context’s Environment
.
Note | |
---|---|
When using Spring Boot and it’s autoconfigured connection factory, it is only necessary to declare the |
The connection might be blocked for interaction from the Broker according to the Memory Alarm . Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection
can be supplied with com.rabbitmq.client.BlockedListener
s to be notified for connection blocked and unblocked events. In addition the AbstractConnectionFactory
emits a ConnectionBlockedEvent
and ConnectionUnblockedEvent
, respectively, via its internal BlockedListener
implementation. These allow you to provide application logic to react appropriately to problems on the broker and take some corrective actions for example.
Important | |
---|---|
When the application is configured with a single |
Starting with version 2.0.2, the RabbitTemplate
has a configuration option to automatically use a second connection factory, unless transactions are being used. See the section called “Using a Separate Connection” for more information. The ConnectionNameStrategy
for the publisher connection is the same as the primary strategy with .publisher
appended to the result of calling the method.
Starting with version 1.7.7, an AmqpResourceNotAvailableException
is provided, which is thrown now when SimpleConnection.createChannel()
can’t create a Channel
, for example, because the channelMax
limit is reached and there are no available channels in the cache. This exception can be used in the RetryPolicy
to recover the operation after some back-off.
The CachingConnectionFactory
uses an instance of the Rabbit client ConnectionFactory
; a number of configuration properties are passed through (host, port, userName, password, requestedHeartBeat, connectionTimeout
for example) when setting the equivalent property on the CachingConnectionFactory
. To set other properties (clientProperties
for example), define an instance of the rabbit factory and provide a reference to it using the appropriate constructor of the CachingConnectionFactory
. When using the namespace as described above, provide a reference to the configured factory in the connection-factory
attribute. For convenience, a factory bean is provided to assist in configuring the connection factory in a Spring application context, as discussed in the next section.
<rabbit:connection-factory id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
Note | |
---|---|
The 4.0.x client enables automatic recovery by default; while compatible with this feature, Spring AMQP has its own recovery mechanisms and the client recovery feature generally isn’t needed. It is recommended to disable |
Starting with version 1.4, a convenient RabbitConnectionFactoryBean
is provided to enable convenient configuration of SSL properties on the underlying client connection factory, using dependency injection. Other setters simply delegate to the underlying factory. Previously you had to configure the SSL options programmatically.
<rabbit:connection-factory id="rabbitConnectionFactory" connection-factory="clientConnectionFactory" host="${host}" port="${port}" virtual-host="${vhost}" username="${username}" password="${password}" /> <bean id="clientConnectionFactory" class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean"> <property name="useSSL" value="true" /> <property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/> </bean>
Refer to the RabbitMQ Documentation for information about configuring SSL. Omit the keyStore
and trustStore
configuration to connect over SSL without certificate validation. Key and trust store configuration can be provided as follows:
The sslPropertiesLocation
property is a Spring Resource
pointing to a properties file containing the following keys:
keyStore=file:/secret/keycert.p12 trustStore=file:/secret/trustStore keyStore.passPhrase=secret trustStore.passPhrase=secret
The keyStore
and truststore
are Spring Resources
pointing to the stores. Typically this properties file will be secured by the operating system with the application having read access.
Starting with Spring AMQP version 1.5, these properties can be set directly on the factory bean. If both discrete properties and sslPropertiesLocation
is provided, properties in the latter will override the discrete values.
Starting with version 1.3, the AbstractRoutingConnectionFactory
has been introduced. This provides a mechanism to configure mappings for several ConnectionFactories
and determine a target ConnectionFactory
by some lookupKey
at runtime. Typically, the implementation checks a thread-bound context. For convenience, Spring AMQP provides the SimpleRoutingConnectionFactory
, which gets the current thread-bound lookupKey
from the SimpleResourceHolder
:
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory"> <property name="targetConnectionFactories"> <map> <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/> <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/> </map> </property> </bean> <rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService { @Autowired private RabbitTemplate rabbitTemplate; public void service(String vHost, String payload) { SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost); rabbitTemplate.convertAndSend(payload); SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory()); } }
It is important to unbind the resource after use. For more information see the JavaDocs of AbstractRoutingConnectionFactory
.
Starting with version 1.4, the RabbitTemplate
supports the SpEL sendConnectionFactorySelectorExpression
and receiveConnectionFactorySelectorExpression
properties, which are evaluated on each AMQP protocol interaction operation (send
, sendAndReceive
, receive
or receiveAndReply
), resolving to a lookupKey
value for the provided AbstractRoutingConnectionFactory
. Bean references, such as "@vHostResolver.getVHost(#root)"
can be used in the expression. For send
operations, the Message to be sent is the root evaluation object; for receive
operations, the queueName is the root evaluation object.
The routing algorithm is: If the selector expression is null
, or is evaluated to null
, or the provided ConnectionFactory
isn’t an instance of AbstractRoutingConnectionFactory
, everything works as before, relying on the provided ConnectionFactory
implementation. The same occurs if the evaluation result isn’t null
, but there is no target ConnectionFactory
for that lookupKey
and the AbstractRoutingConnectionFactory
is configured with lenientFallback = true
. Of course, in the case of an AbstractRoutingConnectionFactory
it does fallback to its routing
implementation based on determineCurrentLookupKey()
. But, if lenientFallback = false
, an IllegalStateException
is thrown.
The Namespace support also provides the send-connection-factory-selector-expression
and receive-connection-factory-selector-expression
attributes on the <rabbit:template>
component.
Also starting with version 1.4, you can configure a routing connection factory in a listener container. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with setQueueNames("foo", "bar")
, the lookup key will be "[foo,bar]"
(no spaces).
Starting with version 1.6.9 you can add a qualifier to the lookup key using setLookupKeyQualifier
on the listener container. This would enable, for example, listening to queues with the same name, but in different virtual host (where you would have a connection factory for each).
For example, with lookup key qualifier foo
and a container listening to queue bar
, the lookup key you would register the target connection factory with would be foo[bar]
.
When using HA queues in a cluster, for the best performance, it can be desirable to connect to the physical broker where the master queue resides. While the CachingConnectionFactory
can be configured with multiple broker addresses; this is to fail over and the client will attempt to connect in order. The LocalizedQueueConnectionFactory
uses the REST API provided by the admin plugin to determine which node the queue is mastered. It then creates (or retrieves from a cache) a CachingConnectionFactory
that will connect to just that node. If the connection fails, the new master node is determined and the consumer connects to it. The LocalizedQueueConnectionFactory
is configured with a default connection factory, in case the physical location of the queue cannot be determined, in which case it will connect as normal to the cluster.
The LocalizedQueueConnectionFactory
is a RoutingConnectionFactory
and the SimpleMessageListenerContainer
uses the queue names as the lookup key as discussed in the section called “Routing Connection Factory” above.
Note | |
---|---|
For this reason (the use of the queue name for the lookup), the |
Note | |
---|---|
The RabbitMQ management plugin must be enabled on each node. |
Caution | |
---|---|
This connection factory is intended for long-lived connections, such as those used by the |
Here is an example configuration, using Spring Boot’s RabbitProperties to configure the factories:
@Autowired private RabbitProperties props; private final String[] adminUris = { "http://host1:15672", "http://host2:15672" }; private final String[] nodes = { "[email protected] ", "[email protected] " }; @Bean public ConnectionFactory defaultConnectionFactory() { CachingConnectionFactory cf = new CachingConnectionFactory(); cf.setAddresses(this.props.getAddresses()); cf.setUsername(this.props.getUsername()); cf.setPassword(this.props.getPassword()); cf.setVirtualHost(this.props.getVirtualHost()); return cf; } @Bean public ConnectionFactory queueAffinityCF( @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) { return new LocalizedQueueConnectionFactory(defaultCF, StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()), this.adminUris, this.nodes, this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(), false, null); }
Notice that the first three parameters are arrays of addresses
, adminUris
and nodes
. These are positional in that when a container attempts to connect to a queue, it determines on which node the queue is mastered and connects to the address in the same array position.
Confirmed and returned messages are supported by setting the CachingConnectionFactory
's publisherConfirms
and publisherReturns
properties to 'true' respectively.
When these options are set, Channel
s created by the factory are wrapped in an PublisherCallbackChannel
, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener
with the Channel
. The PublisherCallbackChannel
implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections.
Also see simplePublisherConfirms
in the section called “Scoped Operations”.
Tip | |
---|---|
For some more background information, please see the following blog post by the RabbitMQ team titled Introducing Publisher Confirms . |
The connection factory supports registering ConnectionListener
and ChannelListener
implementations. This allows you to receive notifications for connection and channel related events. (A ConnectionListener
is used by the RabbitAdmin
to perform declarations when the connection is established - see the section called “Automatic Declaration of Exchanges, Queues and Bindings” for more information).
ConnectionListener.
@FunctionalInterface public interface ConnectionListener { void onCreate(Connection connection); default void onClose(Connection connection) { } default void onShutDown(ShutdownSignalException signal) { } }
Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection
object can be supplied with com.rabbitmq.client.BlockedListener
s to be notified for connection blocked and unblocked events.
ChannelListener.
@FunctionalInterface public interface ChannelListener { void onCreate(Channel channel, boolean transactional); default void onShutDown(ShutdownSignalException signal) { } }
See the section called “Publishing is Asynchronous - How to Detect Success and Failures” for one scenario where you might want to register a ChannelListener
.
A mechanism to enable users to control logging levels was introduced in version 1.5.
The CachingConnectionFactory
uses a default strategy to log channel closures as follows:
- Normal channel closes (200 OK) are not logged.
- If a channel is closed due to a failed passive queue declaration, it is logged at debug level.
- If a channel is closed because the
basic.consume
is refused due to an exclusive consumer condition, it is logged at INFO level. - All others are logged at ERROR level.
To modify this behavior, inject a custom ConditionalExceptionLogger
into the CachingConnectionFactory
in its closeExceptionLogger
property.
Also see the section called “Consumer Events”.
Staring with version 1.6, the CachingConnectionFactory
now provides cache statistics via the getCacheProperties()
method. These statistics can be used to tune the cache to optimize it in production. For example, the high water marks can be used to determine whether the cache size should be increased. If it equals the cache size, you might want to consider increasing further.
Table 3.1. Cache properties for CacheMode.CHANNEL
Property | Meaning |
---|---|
connectionName |
The name of the connection generated by the |
channelCacheSize |
The currently configured maximum channels that are allowed to be idle. |
localPort |
The local port for the connection (if available). This can be used to correlate with connections/channels on the RabbitMQ Admin UI. |
idleChannelsTx |
The number of transactional channels that are currently idle (cached). |
idleChannelsNotTx |
The number of non-transactional channels that are currently idle (cached). |
idleChannelsTxHighWater |
The maximum number of transactional channels that have been concurrently idle (cached). |
idleChannelsNotTxHighWater |
The maximum number of non-transactional channels have been concurrently idle (cached). |
Table 3.2. Cache properties for CacheMode.CONNECTION
Property | Meaning |
---|---|
connectionName:<localPort> |
The name of the connection generated by the |
openConnections |
The number of connection objects representing connections to brokers. |
channelCacheSize |
The currently configured maximum channels that are allowed to be idle. |
connectionCacheSize |
The currently configured maximum connections that are allowed to be idle. |
idleConnections |
The number of connections that are currently idle. |
idleConnectionsHighWater |
The maximum number of connections that have been concurrently idle. |
idleChannelsTx:<localPort> |
The number of transactional channels that are currently idle (cached) for this connection. The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI. |
idleChannelsNotTx:<localPort> |
The number of non-transactional channels that are currently idle (cached) for this connection. The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI. |
idleChannelsTxHighWater: <localPort> |
The maximum number of transactional channels that have been concurrently idle (cached). The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI. |
idleChannelsNotTxHighWater: <localPort> |
The maximum number of non-transactional channels have been concurrently idle (cached). The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI. |
The cacheMode
property (CHANNEL
or CONNECTION
is also included).
Since the first version of Spring AMQP, the framework has provided its own connection and channel recovery in the event of a broker failure. Also, as discussed in Section 3.1.11, “Configuring the broker”, the RabbitAdmin
will re-declare any infrastructure beans (queues etc) when the connection is re-established. It therefore does not rely on the Auto Recovery that is now provided by the amqp-client
library. Spring AMQP now uses the 4.0.x
version of amqp-client
, which has auto recovery enabled by default. Spring AMQP can still use its own recovery mechanisms if you wish, disabling it in the client, (by setting the automaticRecoveryEnabled
property on the underlying RabbitMQ connectionFactory
to false
). However, the framework is completely compatible with auto recovery being enabled. This means any consumers you create within your code (perhaps via RabbitTemplate.execute()
) can be recovered automatically.
The CachingConnectionFactory
now allows you to access the underlying connection factory to allow, for example, setting custom client properties:
connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");
These properties appear in the RabbitMQ Admin UI when viewing the connection.
As with many other high-level abstractions provided by the Spring Framework and related projects, Spring AMQP provides a "template" that plays a central role. The interface that defines the main operations is called AmqpTemplate
. Those operations cover the general behavior for sending and receiving Messages. In other words, they are not unique to any implementation, hence the "AMQP" in the name. On the other hand, there are implementations of that interface that are tied to implementations of the AMQP protocol. Unlike JMS, which is an interface-level API itself, AMQP is a wire-level protocol. The implementations of that protocol provide their own client libraries, so each implementation of the template interface will depend on a particular client library. Currently, there is only a single implementation: RabbitTemplate
. In the examples that follow, you will often see usage of an "AmqpTemplate", but when you look at the configuration examples, or any code excerpts where the template is instantiated and/or setters are invoked, you will see the implementation type (e.g. "RabbitTemplate").
As mentioned above, the AmqpTemplate
interface defines all of the basic operations for sending and receiving Messages. We will explore Message sending and reception, respectively, in the two sections that follow.
See also the section called “AsyncRabbitTemplate”.
Starting with version 1.3 you can now configure the RabbitTemplate
to use a RetryTemplate
to help with handling problems with broker connectivity. Refer to the spring-retry project for complete information; the following is just one example that uses an exponential back off policy and the default SimpleRetryPolicy
which will make three attempts before throwing the exception to the caller.
Using the XML namespace:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="500" /> <property name="multiplier" value="10.0" /> <property name="maxInterval" value="10000" /> </bean> </property> </bean>
Using @Configuration
:
@Bean public AmqpTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); RetryTemplate retryTemplate = new RetryTemplate(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(500); backOffPolicy.setMultiplier(10.0); backOffPolicy.setMaxInterval(10000); retryTemplate.setBackOffPolicy(backOffPolicy); template.setRetryTemplate(retryTemplate); return template; }
Starting with version 1.4, in addition to the retryTemplate
property, the recoveryCallback
option is supported on the RabbitTemplate
. It is used as a second argument for the RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
.
Note | |
---|---|
The |
retryTemplate.execute( new RetryCallback<Object, Exception>() { @Override public Object doWithRetry(RetryContext context) throws Exception { context.setAttribute("message", message); return rabbitTemplate.convertAndSend(exchange, routingKey, message); } }, new RecoveryCallback<Object>() { @Override public Object recover(RetryContext context) throws Exception { Object message = context.getAttribute("message"); Throwable t = context.getLastThrowable(); // Do something with message return null; } }); }
In this case, you would not inject a RetryTemplate
into the RabbitTemplate
.
Publishing messages is an asynchronous mechanism and, by default, messages that can’t be routed are simply dropped by RabbitMQ. For successful publishing you can receive an async confirmation as described in the section called “Publisher Confirms and Returns” below. Let’s consider two failure scenarios:
- publish to an exchange but there is no matching destination queue
- publish to a non-existent exchange
The first case is covered by publisher returns as described in the section called “Publisher Confirms and Returns” below.
For the second case, the message is dropped, no return is generated; the underlying channel is closed with an exception. By default, this exception is logged, but you can register a ChannelListener
with the CachingConnectionFactory
to obtain notifications of such events:
this.connectionFactory.addConnectionListener(new ConnectionListener() { @Override public void onCreate(Connection connection) { } @Override public void onShutDown(ShutdownSignalException signal) { ... } });
You can examine the signal’s reason
property to determine the problem that occurred.
To detect the exception on the sending thread, you can setChannelTransacted(true)
on the RabbitTemplate
and the exception will be detected on the txCommit()
. However, transactions significantly impede performance so consider this carefully before enabling transactions for just this one use case.
The RabbitTemplate
implementation of AmqpTemplate
supports Publisher Confirms and Returns.
For returned messages, the template’s mandatory
property must be set to true
, or the mandatory-expression
must evaluate to true
for a particular message. This feature requires a CachingConnectionFactory
that has its publisherReturns
property set to true (see the section called “Publisher Confirms and Returns”). Returns are sent to the client by it registering a RabbitTemplate.ReturnCallback
by calling setReturnCallback(ReturnCallback callback)
. The callback must implement this method:
void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey);
Only one ReturnCallback
is supported by each RabbitTemplate
. See also the section called “Reply Timeout”.
For Publisher Confirms (aka Publisher Acknowledgements), the template requires a CachingConnectionFactory
that has its publisherConfirms
property set to true. Confirms are sent to the client by it registering a RabbitTemplate.ConfirmCallback
by calling setConfirmCallback(ConfirmCallback callback)
. The callback must implement this method:
void confirm(CorrelationData correlationData, boolean ack, String cause);
The CorrelationData
is an object supplied by the client when sending the original message. The ack
is true for an ack
and false for a nack
. For nack
s, the cause may contain a reason for the nack, if it is available when the nack
is generated. An example is when sending a message to a non-existent exchange. In that case the broker closes the channel; the reason for the closure is included in the cause
. cause
was added in version 1.4.
Only one ConfirmCallback
is supported by a RabbitTemplate
.
Note | |
---|---|
When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal). When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received. When using confirms, the channel will be closed when the last confirm is received. When using only returns, the channel will remain open for the full 5 seconds. It is generally recommended to set the connection factory’s |
Important | |
---|---|
Before version 2.1, channels enabled for publisher confirms were returned to the cache before the confirm(s) were received. Some other process could check out the channel and perform some operation that causes the channel to close - such as publishing a message to a non-existent exchange. This could cause the confirmation to be lost; version 2.1 and later no longer return the channel to the cache while confirmations are outstanding. Since the |
Starting with version 2.1, the CorrelationData
object has a ListenableFuture
which can be used to get the result, instead of using a ConfirmCallback
on the template.
CorrelationData cd1 = new CorrelationData(); this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1); assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
Since it’s a ListenableFuture<Confirm>
, you can either get()
the result when ready, or add listeners for an async callback. The Confirm
object is a simple bean with 2 properties ack
and reason
(for nacks). The reason is not populated for broker-generated nacks; it is populated for nacks generated by the framework (e.g. closing the connection while acks are outstanding).
In addition, when both confirms and returns are enabled, the CorrelationData
is populated with the returned message. It is guaranteed that this will occur before the future is set with the ack.
See also the section called “Scoped Operations” for a simpler mechanism for waiting for publisher confirms.
Normally, when using the template, a Channel
is checked out of the cache (or created), used for the operation, and returned to the cache for reuse. In a multi-threaded environment, there is no guarantee that the next operation will use the same channel. There may be times, however, where you want to have more control over the use of a channel, and ensure that a number of operations are all performed on the same channel.
Starting with version 2.0, a new method invoke
is provided, with an OperationsCallback
. Any operations performed within the scope of the callback, and on the provided RabbitOperations
argument, will use the same dedicated Channel
, which will be closed at the end (not returned to a cache). If the channel is a PublisherCallbackChannel
, it will be returned to the cache after all confirms have been received (see the section called “Publisher Confirms and Returns” above).
@FunctionalInterface public interface OperationsCallback<T> { T doInRabbit(RabbitOperations operations); }
One example of why you might need this is if you wish to use the waitForConfirms()
method on the underlying Channel
. This method was not previously exposed using the Spring API because the channel is, generally, cached and shared as discussed above. The RabbitTemplate
now provides waitForConfirms(long timeout)
and waitForConfirmsOrDie(long timeout)
which delegate to the dedicated channel used within the scope of the OperationsCallback
. The methods cannot be used outside of that scope, for obvious reasons.
Note that a higher-level abstraction which allows you to correlate confirms to requests is provided elsewhere (see the section called “Publisher Confirms and Returns”). If you only want to wait until the broker has confirmed delivery, you can use this simple technique:
Collection<?> messages = getMessagesToSend(); Boolean result = this.template.invoke(t -> { messages.forEach(m -> t.convertAndSend(ROUTE, m)); t.waitForConfirmsOrDie(10_000); return true; });
If you wish RabbitAdmin
operations to be invoked on the same channel, within the scope of the OperationsCallback
, the admin must have been constructed using the same RabbitTemplate
that was used for the invoke
operation.
Note | |
---|---|
The above discussion is moot if the template operations are already performed within the scope of an existing transaction. For example, when running on a transacted listener container thread and performing operations on a transacted template. In that case, the operations will be performed on that channel and committed when the thread returns to the container; it is not necessary to use |
When using confirms in this way, much of the infrastructure set up for correlating confirms to requests is not really needed. Starting with version 2.1, the connection factory supports a new property simplePublisherConfirms
. When this is true
the infrastructure is avoided and the confirm processing will be more efficient.
Furthermore, the RabbitTemplate
sets the property publisherSequenceNumber
in the sent message MessageProperties
. If you wish to check (or log, etc) specific confirmations, you can do so with an overloaded invoke
method:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
Note | |
---|---|
These |
Example:
Collection<?> messages = getMessagesToSend(); Boolean result = this.template.invoke(t -> { messages.forEach(m -> t.convertAndSend(ROUTE, m)); t.waitForConfirmsOrDie(10_000); return true; }, (tag, multiple) -> { log.info("Ack: " + tag + ":" + multiple); }, (tag, multiple) -> { log.info("Nack: " + tag + ":" + multiple); }));
Starting with version 1.4 RabbitMessagingTemplate
, built on top of RabbitTemplate
, provides an integration with the Spring Framework messaging abstraction, i.e. org.springframework.messaging.Message
. This allows you to send and receive messages using the spring-messaging
Message<?>
abstraction. This abstraction is used by other Spring projects such as Spring Integration and Spring’s STOMP support. There are two message converters involved; one to convert between a spring-messaging Message<?>
and Spring AMQP’s Message
abstraction, and one to convert between Spring AMQP’s Message
abstraction and the format required by the underlying RabbitMQ client library. By default, the message payload is converted by the provided RabbitTemplate
's message converter. Alternatively, you can inject a custom MessagingMessageConverter
with some other payload converter:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
Starting with version 1.6, the template now supports a user-id-expression
(userIdExpression
when using Java configuration). If a message is sent, the user id property is set (if not already set) after evaluating this expression. The root object for the evaluation is the message to be sent.
Examples:
<rabbit:template ... user-id-expression="'guest'" /> <rabbit:template ... user-id-expression="@myConnectionFactory.username" />
The first example is a literal expression; the second obtains the username
property from a connection factory bean in the application context.
Starting with version 2.0.2, set the usePublisherConnection
property to true
to use a different connection to that used by listener containers, when possible. This is to avoid consumers being blocked when a producer is blocked for any reason. The CachingConnectionFactory
now maintains a second internal connection factory for this purpose. If the rabbit template is running in a transaction started by the listener container, the container’s channel is used, regardless of this setting.
Important | |
---|---|
In general, you should not use a |
When sending a Message, one can use any of the following methods:
void send(Message message) throws AmqpException; void send(String routingKey, Message message) throws AmqpException; void send(String exchange, String routingKey, Message message) throws AmqpException;
We can begin our discussion with the last method listed above since it is actually the most explicit. It allows an AMQP Exchange name to be provided at runtime along with a routing key. The last parameter is the callback that is responsible for actual creating of the Message instance. An example of using this method to send a Message might look this this:
amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
The "exchange" property can be set on the template itself if you plan to use that template instance to send to the same exchange most or all of the time. In such cases, the second method listed above may be used instead. The following example is functionally equivalent to the previous one:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
If both the "exchange" and "routingKey" properties are set on the template, then the method accepting only the Message
may be used:
amqpTemplate.setExchange("marketData.topic"); amqpTemplate.setRoutingKey("quotes.nasdaq.FOO"); amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
A better way of thinking about the exchange and routing key properties is that the explicit method parameters will always override the template’s default values. In fact, even if you do not explicitly set those properties on the template, there are always default values in place. In both cases, the default is an empty String, but that is actually a sensible default. As far as the routing key is concerned, it’s not always necessary in the first place (e.g. a Fanout Exchange). Furthermore, a Queue may be bound to an Exchange with an empty String. Those are both legitimate scenarios for reliance on the default empty String value for the routing key property of the template. As far as the Exchange name is concerned, the empty String is quite commonly used because the AMQP specification defines the "default Exchange" as having no name. Since all Queues are automatically bound to that default Exchange (which is a Direct Exchange) using their name as the binding value, that second method above can be used for simple point-to-point Messaging to any Queue through the default Exchange. Simply provide the queue name as the "routingKey" - either by providing the method parameter at runtime:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
Or, if you prefer to create a template that will be used for publishing primarily or exclusively to a single Queue, the following is perfectly reasonable:
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue template.send(new Message("Hello World".getBytes(), someProperties));
Starting with version 1.3, a message builder API is provided by the MessageBuilder
and MessagePropertiesBuilder
; they provides a convenient "fluent" means of creating a message or message properties:
Message message = MessageBuilder.withBody("foo".getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build();
or
MessageProperties props = MessagePropertiesBuilder.newInstance() .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setMessageId("123") .setHeader("bar", "baz") .build(); Message message = MessageBuilder.withBody("foo".getBytes()) .andProperties(props) .build();
Each of the properties defined on the MessageProperties can be set. Other methods include setHeader(String key, String value)
, removeHeader(String key)
, removeHeaders()
, and copyProperties(MessageProperties properties)
. Each property setting method has a set*IfAbsent()
variant. In the cases where a default initial value exists, the method is named set*IfAbsentOrDefault()
.
Five static methods are provided to create an initial message builder:
public static MessageBuilder withBody(byte[] body) public static MessageBuilder withClonedBody(byte[] body) public static MessageBuilder withBody(byte[] body, int from, int to) public static MessageBuilder fromMessage(Message message) public static MessageBuilder fromClonedMessage(Message message)
The message created by the builder will have a body that is a direct reference to the argument. |
|
The message created by the builder will have a body that is a new array containing a copy of bytes in the argument. |
|
The message created by the builder will have a body that is a new array containing the range of bytes from the argument. See |
|
The message created by the builder will have a body that is a direct reference to the body of the argument. The argument’s properties are copied to a new |
|
The message created by the builder will have a body that is a new array containing a copy of the argument’s body. The argument’s properties are copied to a new |
public static MessagePropertiesBuilder newInstance() public static MessagePropertiesBuilder fromProperties(MessageProperties properties) public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties)
A new message properties object is initialized with default values. |
|
The builder is initialized with, and |
|
The argument’s properties are copied to a new |
With the RabbitTemplate
implementation of AmqpTemplate
, each of the send()
methods has an overloaded version that takes an additional CorrelationData
object. When publisher confirms are enabled, this object is returned in the callback described in Section 3.1.4, “AmqpTemplate”. This allows the sender to correlate a confirm (ack or nack) with the sent message.
Starting with version 1.6.7, the CorrelationAwareMessagePostProcessor
interface was introduced, allowing the correlation data to be modified after the message has been converted:
Message postProcessMessage(Message message, Correlation correlation);
In version 2.0, this interface is deprecated; the method has been moved to MessagePostProcessor
with a default implementation that delegates to postProcessMessage(Message message)
.
Also starting with version 1.6.7 a new callback interface is provided CorrelationDataPostProcessor
; this is invoked after all MessagePostProcessor
s (provided in the send()
method as well as those provided in setBeforePublishPostProcessors()
). Implementations can update or replace the correlation data supplied in the send()
method (if any). The Message
and original CorrelationData
(if any) are provided as arguments.
CorrelationData postProcess(Message message, CorrelationData correlationData);
When the template’s mandatory
property is true returned messages are provided by the callback described in Section 3.1.4, “AmqpTemplate”.
Starting with version 1.4 the RabbitTemplate
supports the SpEL mandatoryExpression
property, which is evaluated against each request message, as the root evaluation object, resolving to a boolean
value. Bean references, such as "@myBean.isMandatory(#root)"
can be used in the expression.
Publisher returns can also be used internally by the RabbitTemplate
in send and receive operations. See the section called “Reply Timeout” for more information.
Starting with version 1.4.2, the BatchingRabbitTemplate
has been introduced. This is a subclass of RabbitTemplate
with an overridden send
method that batches messages according to the BatchingStrategy
; only when a batch is complete is the message sent to RabbitMQ.
public interface BatchingStrategy { MessageBatch addToBatch(String exchange, String routingKey, Message message); Date nextRelease(); Collection<MessageBatch> releaseBatches(); }
Caution | |
---|---|
Batched data is held in memory; unsent messages can be lost in the event of a system failure. |
A SimpleBatchingStrategy
is provided. It supports sending messages to a single exchange/routing key. It has properties:
-
batchSize
- the number of messages in a batch before it is sent -
bufferLimit
- the maximum size of the batched message; this will preempt thebatchSize
if exceeded, and cause a partial batch to be sent -
timeout
- a time after which a partial batch will be sent when there is no new activity adding messages to the batch
The SimpleBatchingStrategy
formats the batch by preceding each embedded message with a 4 byte binary length. This is communicated to the receiving system by setting the springBatchFormat
message property to lengthHeader4
.
Important | |
---|---|
Batched messages are automatically de-batched by listener containers (using the |
Message reception is always a little more complicated than sending. There are two ways to receive a Message
. The simpler option is to poll for a single Message
at a time with a polling method call. The more complicated yet more common approach is to register a listener that will receive Messages
on-demand, asynchronously. We will look at an example of each approach in the next two sub-sections.
The AmqpTemplate
itself can be used for polled Message reception. By default, if no message is available, null
is returned immediately; there is no blocking. Starting with version 1.5, you can now set a receiveTimeout
, in milliseconds, and the receive methods will block for up to that long, waiting for a message. A value less than zero means block indefinitely (or at least until the connection to the broker is lost). Version 1.6 introduced variants of the receive
methods allowing the timeout to be passed in on each call.
Caution | |
---|---|
Since the receive operation creates a new |
There are four simple receive methods available. As with the Exchange on the sending side, there is a method that requires a default queue property having been set directly on the template itself, and there is a method that accepts a queue parameter at runtime. Version 1.6 introduced variants to accept timeoutMillis
to override receiveTimeout
on a per-request basis.
Message receive() throws AmqpException; Message receive(String queueName) throws AmqpException; Message receive(long timeoutMillis) throws AmqpException; Message receive(String queueName, long timeoutMillis) throws AmqpException;
Just like in the case of sending messages, the AmqpTemplate
has some convenience methods for receiving POJOs instead of Message
instances, and implementations will provide a way to customize the MessageConverter
used to create the Object
returned:
Object receiveAndConvert() throws AmqpException; Object receiveAndConvert(String queueName) throws AmqpException; Message receiveAndConvert(long timeoutMillis) throws AmqpException; Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
Starting with version 2.0, there are variants of these methods that take an additional ParameterizedTypeReference
argument to convert complex types. The template must be configured with a SmartMessageConverter
; see the section called “Converting From a Message With RabbitTemplate” for more information.
Similar to sendAndReceive
methods, beginning with version 1.3, the AmqpTemplate
has several convenience receiveAndReply
methods for synchronously receiving, processing and replying to messages:
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey) throws AmqpException; <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException; <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
The AmqpTemplate
implementation takes care of the receive and reply phases. In most cases you should provide only an implementation of ReceiveAndReplyCallback
to perform some business logic for the received message and build a reply object or message, if needed. Note, a ReceiveAndReplyCallback
may return null
. In this case no reply is sent and receiveAndReply
works like the receive
method. This allows the same queue to be used for a mixture of messages, some of which may not need a reply.
Automatic message (request and reply) conversion is applied only if the provided callback is not an instance of ReceiveAndReplyMessageCallback
- which provides a raw message exchange contract.
The ReplyToAddressCallback
is useful for cases requiring custom logic to determine the replyTo
address at runtime against the received message and reply from the ReceiveAndReplyCallback
. By default, replyTo
information in the request message is used to route the reply.
The following is an example of POJO-based receive and reply…
boolean received = this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() { public Invoice handle(Order order) { return processOrder(order); } }); if (received) { log.info("We received an order!"); }
Important | |
---|---|
Spring AMQP also supports annotated-listener endpoints through the use of the |
Important | |
---|---|
The prefetch default value used to be 1, which could lead to under-utilization of efficient consumers. Starting with version 2.0, the default prefetch value is now 250, which should keep consumers busy in most common scenarios and thus improve throughput. There are nevertheless scenarios where the prefetch value should be low: for example, with large messages, especially if the processing is slow (messages could add up to a large amount of memory in the client process), and if strict message ordering is necessary (the prefetch value should be set back to 1 in this case). Also, with low-volume messaging and multiple consumers (including concurrency within a single listener container instance), you may wish to reduce the prefetch to get a more even distribution of messages across consumers. It is also recomended to use See Section 3.1.17, “Message Listener Container Configuration”. For more background about prefetch, see this post about consumer utilization in RabbitMQ and this post about queuing theory . |
For asynchronous Message reception, a dedicated component (not the AmqpTemplate
) is involved. That component is a container for a Message consuming callback. We will look at the container and its properties in just a moment, but first we should look at the callback since that is where your application code will be integrated with the messaging system. There are a few options for the callback starting with an implementation of the MessageListener
interface:
public interface MessageListener { void onMessage(Message message); }
If your callback logic depends upon the AMQP Channel instance for any reason, you may instead use the ChannelAwareMessageListener
. It looks similar but with an extra parameter:
public interface ChannelAwareMessageListener { void onMessage(Message message, Channel channel) throws Exception; }
Important | |
---|---|
In version 2.1, this interface moved from package |
If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework. This is often referred to as "Message-driven POJO" support.
Note | |
---|---|
Version 1.5 introduced a more flexible mechanism for POJO messaging, the |
When using the adapter, you only need to provide a reference to the instance that the adapter itself should invoke.
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo); listener.setDefaultListenerMethod("myMethod");
You can subclass the adapter and provide an implementation of getListenerMethodName()
to dynamically select different methods based on the message. This method has two parameters, the originalMessage
and extractedMessage
, the latter being the result of any conversion. By default, a SimpleMessageConverter
is configured; see the section called “SimpleMessageConverter” for more information and information about other converters available.
Starting with version 1.4.2, the original message has properties consumerQueue
and consumerTag
which can be used to determine which queue a message was received from.
Starting with version 1.5, you can configure a map of consumer queue/tag to method name, to dynamically select the method to call. If no entry is in the map, we fall back to the default listener method. The default listener method (if not set) is handleMessage
.
Starting with version 2.0, a convenient FunctionalInterface
has been provided:
@FunctionalInterface public interface ReplyingMessageListener<T, R> { R handleMessage(T t); }
This facilitates convenient configuration of the adapter using Java 8 lamdas:
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> { ... return result; }));
Now that you’ve seen the various options for the Message-listening callback, we can turn our attention to the container. Basically, the container handles the "active" responsibilities so that the listener callback can remain passive. The container is an example of a "lifecycle" component. It provides methods for starting and stopping. When configuring the container, you are essentially bridging the gap between an AMQP Queue and the MessageListener
instance. You must provide a reference to the ConnectionFactory
and the queue name(s) or Queue instance(s) from which that listener should consume Messages.
With versions prior to version 2.0, there was one listener container - the SimpleMessageListenerContainer
; there is now a second container - the DirectMessageListenerContainer
. The differences between the containers and criteria you might apply when choosing which to use are described in the section called “Choosing a Container”.
Here is the most basic example using the, SimpleMessageListenerContainer
:
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory); container.setQueueNames("some.queue"); container.setMessageListener(new MessageListenerAdapter(somePojo));
As an "active" component, it’s most common to create the listener container with a bean definition so that it can simply run in the background. This can be done via XML:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/> </rabbit:listener-container>
or
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/> </rabbit:listener-container>
will create a DirectMessageListenerContainer
(notice the type
attribute - it defaults to simple
).
Or, you may prefer to use the @Configuration style which will look very similar to the actual code snippet above:
@Configuration public class ExampleAmqpConfiguration { @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(rabbitConnectionFactory()); container.setQueueName("some.queue"); container.setMessageListener(exampleListener()); return container; } @Bean public ConnectionFactory rabbitConnectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public MessageListener exampleListener() { return new MessageListener() { public void onMessage(Message message) { System.out.println("received: " + message); } }; } }
Starting with RabbitMQ Version 3.2, the broker now supports consumer priority (see Using Consumer Priorities with RabbitMQ ). This is enabled by setting the x-priority
argument on the consumer. The SimpleMessageListenerContainer
now supports setting consumer arguments:
container.setConsumerArguments(Collections. <String, Object> singletonMap("x-priority", Integer.valueOf(10)));
For convenience, the namespace provides the priority
attribute on the listener
element:
<rabbit:listener-container connection-factory="rabbitConnectionFactory"> <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" /> </rabbit:listener-container>
Starting with version 1.3 the queue(s) on which the container is listening can be modified at runtime; see Section 3.1.20, “Listener Container Queues”.
When a container is configured to listen to auto-delete
queue(s), or the queue has an x-expires
option or the Time-To-Live policy is configured on the Broker, the queue is removed by the broker when the container is stopped (last consumer is cancelled). Before version 1.3, the container could not be restarted because the queue was missing; the RabbitAdmin
only automatically redeclares queues etc, when the connection is closed/opens, which does not happen when the container is stopped/started.
Starting with version 1.3, the container will now use a RabbitAdmin
to redeclare any missing queues during startup.
You can also use conditional declaration (the section called “Conditional Declaration”) together with an auto-startup="false"
admin to defer queue declaration until the container is started.
<rabbit:queue id="otherAnon" declared-by="containerAdmin" /> <rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin"> <rabbit:bindings> <rabbit:binding queue="otherAnon" key="otherAnon" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:listener-container id="container2" auto-startup="false"> <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" /> </rabbit:listener-container> <rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
In this case, the queue and exchange are declared by containerAdmin
which has auto-startup="false"
so the elements are not declared during context initialization. Also, the container is not started for the same reason. When the container is later started, it uses it’s reference to containerAdmin
to declare the elements.
Batched messages are automatically de-batched by listener containers (using the springBatchFormat
message header). Rejecting any message from a batch will cause the entire batch to be rejected. See the section called “Batching” for more information about batching.
The containers publish application events whenever a listener (consumer) experiences a failure of some kind. The event ListenerContainerConsumerFailedEvent
has the following properties:
-
container
- the listener container where the consumer experienced the problem. -
reason
- a textual reason for the failure. -
fatal
- a boolean indicating whether the failure was fatal; with non-fatal exceptions, the container will attempt to restart the consumer, according to therecoveryInterval
orrecoveryBackoff
(for theSimpleMessageListenerContainer
) or themonitorInterval
(for theDirectMessageListenerContainer
). -
throwable
- theThrowable
that was caught.
These events can be consumed by implementing ApplicationListener<ListenerContainerConsumerFailedEvent>
.
Note | |
---|---|
System-wide events (such as connection failures) will be published by all consumers when |
If a consumer fails because one if its queues is being used exclusively, by default, as well as publishing the event, a WARN
log is issued. To change this logging behavior, provide a custom ConditionalExceptionLogger
in the SimpleMessageListenerContainer
's exclusiveConsumerExceptionLogger
property. See also the section called “Logging Channel Close Events”.
Fatal errors are always logged at ERROR
level; this it not modifiable.
Several other events are published at various stages of the container lifecycle:
-
AsyncConsumerStartedEvent
(when the consumer is started) -
AsyncConsumerRestartedEvent
(when the consumer is restarted after a failure -SimpleMessageListenerContainer
only) -
AsyncConsumerTerminatedEvent
(when a consumer is stopped normally) -
AsyncConsumerStoppedEvent
(when the consumer is stopped -SimpleMessageListenerContainer
only) -
ConsumeOkEvent
(when aconsumeOk
is received from the broker, contains the queue name andconsumerTag
) -
ListenerContainerIdleEvent
(see the section called “Detecting Idle Asynchronous Consumers”)
You can provide a strategy to generate consumer tags. By default, the consumer tag will be generated by the broker.
public interface ConsumerTagStrategy { String createConsumerTag(String queue); }
The queue is made available so it can (optionally) be used in the tag.
See Section 3.1.17, “Message Listener Container Configuration”.
The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it allows you to expose a method of a managed bean as a Rabbit listener endpoint.
@Component public class MyService { @RabbitListener(queues = "myQueue") public void processOrder(String data) { ... } }
The idea of the example above is that, whenever a message is available on the queue named myQueue
, the processOrder
method is invoked accordingly (in this case, with the payload of the message).
The annotated endpoint infrastructure creates a message listener container behind the scenes for each annotated method, using a RabbitListenerContainerFactory
.
In the example above, myQueue
must already exist and be bound to some exchange. The queue can be declared and bound automatically, as long as a RabbitAdmin
exists in the application context.
Note | |
---|---|
Property placeholders ( |
@Component public class MyService { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "myQueue", durable = "true"), exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"), key = "orderRoutingKey") ) public void processOrder(Order order) { ... } @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "auto.exch"), key = "invoiceRoutingKey") ) public void processInvoice(Invoice invoice) { ... } @RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true")) public String handleWithSimpleDeclare(String data) { ... } }
In the first example, a queue myQueue
will be declared automatically (durable) together with the exchange, if needed, and bound to the exchange with the routing key. In the second example, an anonymous (exclusive, auto-delete) queue will be declared and bound. Multiple QueueBinding
entries can be provided, allowing the listener to listen to multiple queues. In the third example, a queue with the name retrieved from property my.queue
will be declared if necessary, with the default binding to the default exchange using the queue name as the routing key.
Since version 2.0 the @Exchange
annotation supports any exchange types, including custom. See more information in the AMQP Concepts document.
Use normal @Bean
definitions when more advanced configuration is required.
Notice ignoreDeclarationExceptions
on the exchange in the first example. This allows, for example, binding to an existing exchange that might have different settings (such as internal
). By default the properties of an existing exchange must match.
Starting with version 2.0, you can now bind a queue to an exchange with multiple routing keys:
... key = { "red", "yellow" } ...
You can also specify arguments within @QueueBinding
annotations for queues, exchanges and bindings. For example:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "auto.headers", autoDelete = "true", arguments = @Argument(name = "x-message-ttl", value = "10000", type = "java.lang.Integer")), exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"), arguments = { @Argument(name = "x-match", value = "all"), @Argument(name = "foo", value = "bar"), @Argument(name = "baz") }) ) public String handleWithHeadersExchange(String foo) { ... }
Notice that the x-message-ttl
argument is set to 10 seconds for the queue. Since the argument type is not String
, we have to specify its type; in this case Integer
. As with all such declarations, if the queue exists already, the arguments must match those on the queue. For the header exchange, we set the binding arguments to match messages that have the header foo
set to bar
and the header baz
must be present with any value. The x-match
argument means both conditions must be satisfied.
The argument name, value, and type can be property placeholders (${...}
) or SpEL expressions (#{...}
). The name
must resolve to a String
; the expression for type
must resolve to a Class
or the fully-qualified name of a class. The value
must resolve to something that can be converted by the DefaultConversionService
to the type (such as the x-message-ttl
in the above example).
If a name resolves to null
or an empty String
, that @Argument
is ignored.
Sometimes you may want to use the same configuration for multiple listeners. To reduce the boilerplate configuration, you can use meta-annotations to create your own listener annotation:
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @RabbitListener(bindings = @QueueBinding( value = @Queue, exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT))) public @interface MyAnonFanoutListener { } public class MetaListener { @MyAnonFanoutListener public void handle1(String foo) { ... } @MyAnonFanoutListener public void handle2(String foo) { ... } }
In this example, each listener created by the @MyAnonFanoutListener
annotation will bind an anonymous, auto-delete queue to the fanout exchange metaFanout
. The meta-annotation mechanism is simple in that attributes on the user-defined annotation are not examined - so you can’t override settings from the meta-annotation. Use normal @Bean
definitions when more advanced configuration is required.
To enable support for @RabbitListener
annotations add @EnableRabbit
to one of your @Configuration
classes.
@Configuration @EnableRabbit public class AppConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); return factory; } }
Since version 2.0, a DirectMessageListenerContainerFactory
is also available, which creates DirectMessageListenerContainer
s.
Note | |
---|---|
To choose between the |
By default, the infrastructure looks for a bean named rabbitListenerContainerFactory
as the source for the factory to use to create message listener containers. In this case, and ignoring the RabbitMQ infrastructure setup, the processOrder
method can be invoked with a core poll size of 3 threads and a maximum pool size of 10 threads.
It is possible to customize the listener container factory to use per annotation or an explicit default can be configured by implementing the RabbitListenerConfigurer
interface. The default is only required if at least one endpoint is registered without a specific container factory. See the javadoc for full details and examples.
The container factories provide methods for adding MessagePostProcessor
s that will be applied after receiving messages (before invoking the listener) and before sending replies.
Starting with version 2.0.6 a RetryTemplate
and RecoveryCallback
can be added to the listener container factory, used when sending replies. The RecoveryCallback
is invoked when retries are exhausted; a SendRetryContextAccessor
can be used to get information from the context:
factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
If you prefer XML configuration, use the <rabbit:annotation-driven>
element; any beans annotated with @RabbitListener
will be detected.
For SimpleRabbitListenerContainer
s:
<rabbit:annotation-driven/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="concurrentConsumers" value="3"/> <property name="maxConcurrentConsumers" value="10"/> </bean>
and for DirectMessageListenerContainer
s:
<rabbit:annotation-driven/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="consumersPerQueue" value="3"/> </bean>
Starting with version 2.0, the @RabbitListener
annotation has a concurrency
property; it supports SpEL expressions (#{...}
) and property placeholders (${...}
). Its meaning, and allowed values, depend on the container type.
- For the
DirectMessageListenerContainer
, the value must be a single integer value, which sets theconsumersPerQueue
property on the container. - For the
SimpleRabbitListenerContainer
, the value can be a single integer value, which sets theconcurrentConsumers
property on the container, or it can have the formm-n
wherem
is theconcurrentConsumers
property, andn
is themaxConcurrentConsumers
property.
In either case, this setting overrides the setting(s) on the factory. Previously you had to define different container factories if you had listeners that required different concurrency.
There are two conversion steps in the pipeline before invoking the listener. The first uses a MessageConverter
to convert the incoming Spring AMQP Message
to a spring-messaging Message
. When the target method is invoked, the message payload is converted, if necessary, to the method parameter type.
The default MessageConverter
for the first step is a Spring AMQP SimpleMessageConverter
that handles conversion to String
and java.io.Serializable
objects; all others remain as a byte[]
. In the following discussion, we call this the message converter.
The default converter for the second step is a GenericMessageConverter
which delegates to a conversion service (an instance of DefaultFormattingConversionService
). In the following discussion, we call this the method argument converter.
To change the message converter, simply add it as a property to the container factory bean:
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); ... factory.setMessageConverter(new Jackson2JsonMessageConverter()); ... return factory; }
This configures a Jackson2 converter that expects header information to be present to guide the conversion.
You can also consider a ContentTypeDelegatingMessageConverter
which can handle conversion of different content types.
In most cases, it is not necessary to customize the method argument converter unless, for example, you want to use a custom ConversionService
.
In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper
was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.
Note | |
---|---|
This type inference only works for |
See the section called “Jackson2JsonMessageConverter” for more information.
If you wish to customize the method argument converter, you can do so as follows:
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { ... @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new GenericMessageConverter(myConversionService())); return factory; } @Bean public ConversionService myConversionService() { DefaultConversionService conv = new DefaultConversionService(); conv.addConverter(mySpecialConverter()); return conv; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } ... }
Important | |
---|---|
for multi-method listeners (see the section called “Multi-Method Listeners”), the method selection is based on the payload of the message after the message conversion; the method argument converter is only called after the method has been selected. |
RabbitListenerEndpoint
provides a model of a Rabbit endpoint and is responsible for configuring the container for that model. The infrastructure allows you to configure endpoints programmatically in addition to the ones that are detected by the RabbitListener
annotation.
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint(); endpoint.setQueueNames("anotherQueue"); endpoint.setMessageListener(message -> { // processing }); registrar.registerEndpoint(endpoint); } }
In the example above, we used SimpleRabbitListenerEndpoint
which provides the actual MessageListener
to invoke but you could just as well build your own endpoint variant describing a custom invocation mechanism.
It should be noted that you could just as well skip the use of @RabbitListener
altogether and only register your endpoints programmatically through RabbitListenerConfigurer.
So far, we have been injecting a simple String in our endpoint but it can actually have a very flexible method signature. Let’s rewrite it to inject the Order
with a custom header:
@Component public class MyService { @RabbitListener(queues = "myQueue") public void processOrder(Order order, @Header("order_type") String orderType) { ... } }
These are the main elements you can inject in listener endpoints:
The raw org.springframework.amqp.core.Message
.
The com.rabbitmq.client.Channel
on which the message was received
The org.springframework.messaging.Message
representing the incoming AMQP message. Note that this message holds both the custom and the standard headers (as defined by AmqpHeaders
).
Note | |
---|---|
Starting with version 1.6, the inbound |
@Header
-annotated method arguments to extract a specific header value, including standard AMQP headers.
@Headers
-annotated argument that must also be assignable to java.util.Map
for getting access to all headers.
A non-annotated element that is not one of the supported types (i.e. Message
and Channel
) is considered to be the payload. You can make that explicit by annotating the parameter with @Payload
. You can also turn on validation by adding an extra @Valid
.
The ability to inject Spring’s Message abstraction is particularly useful to benefit from all the information stored in the transport-specific message without relying on transport-specific API.
@RabbitListener(queues = "myQueue") public void processOrder(Message<Order> order) { ... }
Handling of method arguments is provided by DefaultMessageHandlerMethodFactory
which can be further customized to support additional method arguments. The conversion and validation support can be customized there as well.
For instance, if we want to make sure our Order is valid before processing it, we can annotate the payload with @Valid
and configure the necessary validator as follows:
@Configuration @EnableRabbit public class AppConfig implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setValidator(myValidator()); return factory; } }
When using the queues
attribute, you can specify that the associated container can listen to multiple queues. You can use a @Header
annotation to make the queue name from which a message was received available to the POJO method:
@Component public class MyService { @RabbitListener(queues = { "queue1", "queue2" } ) public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { ... } }
Starting with version 1.5, you can externalize the queue names using property placeholders, and SpEL:
@Component public class MyService { @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" ) public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) { ... } }
Prior to version 1.5, only a single queue could be specified this way; each queue needed a separate property.
The existing support in MessageListenerAdapter
already allows your method to have a non-void return type. When that’s the case, the result of the invocation is encapsulated in a message sent either in the address specified in the ReplyToAddress
header of the original message or in the default address configured on the listener. That default address can now be set using the @SendTo
annotation of the messaging abstraction.
Assuming our processOrder
method should now return an OrderStatus
, it is possible to write it as follow to automatically send a reply:
@RabbitListener(destination = "myQueue") @SendTo("status") public OrderStatus processOrder(Order order) { // order processing return status; }
If you need to set additional headers in a transport-independent manner, you could return a Message
instead, something like:
@RabbitListener(destination = "myQueue") @SendTo("status") public Message<OrderStatus> processOrder(Order order) { // order processing return MessageBuilder .withPayload(status) .setHeader("code", 1234) .build(); }
The @SendTo
value is assumed as a reply exchange
and routingKey
pair following the pattern exchange/routingKey
, where one of those parts can be omitted. The valid values are:
foo/bar
- the replyTo exchange and routingKey.
foo/
- the replyTo exchange and default (empty) routingKey.
bar
or /bar
- the replyTo routingKey and default (empty) exchange.
/
or empty - the replyTo default exchange and default routingKey.
Also @SendTo
can be used without a value
attribute. This case is equal to an empty sendTo pattern. @SendTo
is only used if the inbound message does not have a replyToAddress
property.
Starting with version 1.5, the @SendTo
value can be a bean initialization SpEL Expression, for example…
@RabbitListener(queues = "test.sendTo.spel") @SendTo("#{spelReplyTo}") public String capitalizeWithSendToSpel(String foo) { return foo.toUpperCase(); } ... @Bean public String spelReplyTo() { return "test.sendTo.reply.spel"; }
The expression must evaluate to a String
, which