3. Reference

This part of the reference documentation details the various components that comprise Spring AMQP. The main chapter covers the core classes to develop an AMQP application. This part also includes a chapter about the sample applications.

3.1 Using Spring AMQP

In this chapter, we will explore the interfaces and classes that are the essential components for developing applications with Spring AMQP.

3.1.1 AMQP Abstractions

Introduction

Spring AMQP consists of a handful of modules, each represented by a JAR in the distribution. These modules are: spring-amqp, and spring-rabbit. The spring-amqp module contains the org.springframework.amqp.core package. Within that package, you will find the classes that represent the core AMQP "model". Our intention is to provide generic abstractions that do not rely on any particular AMQP broker implementation or client library. End user code will be more portable across vendor implementations as it can be developed against the abstraction layer only. These abstractions are then used implemented by broker-specific modules, such as spring-rabbit. There is currently only a RabbitMQ implementation; however the abstractions have been validated in .NET using Apache Qpid in addition to RabbitMQ. Since AMQP operates at the protocol level in principle, the RabbitMQ client can be used with any broker that supports the same protocol version, but we do not test any other brokers at present.

The overview here assumes that you are already familiar with the basics of the AMQP specification. If you are not, then have a look at the resources listed in Chapter 5, Other Resources

Message

The 0-9-1 AMQP specification does not define a Message class or interface. Instead, when performing an operation such as basicPublish(), the content is passed as a byte-array argument and additional properties are passed in as separate arguments. Spring AMQP defines a Message class as part of a more general AMQP domain model representation. The purpose of the Message class is to simply encapsulate the body and properties within a single instance so that the API can in turn be simpler. The Message class definition is quite straightforward.

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

The MessageProperties interface defines several common properties such as messageId, timestamp, contentType, and several more. Those properties can also be extended with user-defined headers by calling the setHeader(String key, Object value) method.

[Important] Important

Starting with versions 1.5.7, 1.6.11, 1.7.4, 2.0.0, if a message body is a serialized Serializable java object, it is no longer deserialized (by default) when performing toString() operations (such as in log messages). This is to prevent unsafe deserialization. By default, only java.util and java.lang classes are deserialized. To revert to the previous behavior, you can add allowable class/package patterns by invoking Message.addWhiteListPatterns(...). A simple * wildcard is supported, for example com.foo.*, *.MyClass. Bodies that cannot be deserialized will be represented by byte[<size>] in log messages.

Exchange

The Exchange interface represents an AMQP Exchange, which is what a Message Producer sends to. Each Exchange within a virtual host of a broker will have a unique name as well as a few other properties:

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

As you can see, an Exchange also has a type represented by constants defined in ExchangeTypes. The basic types are: Direct, Topic, Fanout, and Headers. In the core package you will find implementations of the Exchange interface for each of those types. The behavior varies across these Exchange types in terms of how they handle bindings to Queues. For example, a Direct exchange allows for a Queue to be bound by a fixed routing key (often the Queue’s name). A Topic exchange supports bindings with routing patterns that may include the * and # wildcards for exactly-one and zero-or-more, respectively. The Fanout exchange publishes to all Queues that are bound to it without taking any routing key into consideration. For much more information about these and the other Exchange types, check out Chapter 5, Other Resources.

[Note] Note

The AMQP specification also requires that any broker provide a "default" Direct Exchange that has no name. All Queues that are declared will be bound to that default Exchange with their names as routing keys. You will learn more about the default Exchange’s usage within Spring AMQP in Section 3.1.4, “AmqpTemplate”.

Queue

The Queue class represents the component from which a Message Consumer receives Messages. Like the various Exchange classes, our implementation is intended to be an abstract representation of this core AMQP type.

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

Notice that the constructor takes the Queue name. Depending on the implementation, the admin template may provide methods for generating a uniquely named Queue. Such Queues can be useful as a "reply-to" address or other temporary situations. For that reason, the exclusive and autoDelete properties of an auto-generated Queue would both be set to true.

[Note] Note

See the section on queues in Section 3.1.11, “Configuring the broker” for information about declaring queues using namespace support, including queue arguments.

Binding

Given that a producer sends to an Exchange and a consumer receives from a Queue, the bindings that connect Queues to Exchanges are critical for connecting those producers and consumers via messaging. In Spring AMQP, we define a Binding class to represent those connections. Let’s review the basic options for binding Queues to Exchanges.

You can bind a Queue to a DirectExchange with a fixed routing key.

new Binding(someQueue, someDirectExchange, "foo.bar")

You can bind a Queue to a TopicExchange with a routing pattern.

new Binding(someQueue, someTopicExchange, "foo.*")

You can bind a Queue to a FanoutExchange with no routing key.

new Binding(someQueue, someFanoutExchange)

We also provide a BindingBuilder to facilitate a "fluent API" style.

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
[Note] Note

The BindingBuilder class is shown above for clarity, but this style works well when using a static import for the bind() method.

By itself, an instance of the Binding class is just holding the data about a connection. In other words, it is not an "active" component. However, as you will see later in Section 3.1.11, “Configuring the broker”, Binding instances can be used by the AmqpAdmin class to actually trigger the binding actions on the broker. Also, as you will see in that same section, the Binding instances can be defined using Spring’s @Bean-style within @Configuration classes. There is also a convenient base class which further simplifies that approach for generating AMQP-related bean definitions and recognizes the Queues, Exchanges, and Bindings so that they will all be declared on the AMQP broker upon application startup.

The AmqpTemplate is also defined within the core package. As one of the main components involved in actual AMQP messaging, it is discussed in detail in its own section (see Section 3.1.4, “AmqpTemplate”).

3.1.2 Connection and Resource Management

Introduction

Whereas the AMQP model we described in the previous section is generic and applicable to all implementations, when we get into the management of resources, the details are specific to the broker implementation. Therefore, in this section, we will be focusing on code that exists only within our "spring-rabbit" module since at this point, RabbitMQ is the only supported implementation.

The central component for managing a connection to the RabbitMQ broker is the ConnectionFactory interface. The responsibility of a ConnectionFactory implementation is to provide an instance of org.springframework.amqp.rabbit.connection.Connection which is a wrapper for com.rabbitmq.client.Connection. The only concrete implementation we provide is CachingConnectionFactory which, by default, establishes a single connection proxy that can be shared by the application. Sharing of the connection is possible since the "unit of work" for messaging with AMQP is actually a "channel" (in some ways, this is similar to the relationship between a Connection and a Session in JMS). As you can imagine, the connection instance provides a createChannel method. The CachingConnectionFactory implementation supports caching of those channels, and it maintains separate caches for channels based on whether they are transactional or not. When creating an instance of CachingConnectionFactory, the hostname can be provided via the constructor. The username and password properties should be provided as well. If you would like to configure the size of the channel cache (the default is 25), you could call the setChannelCacheSize() method here as well.

Starting with version 1.3, the CachingConnectionFactory can be configured to cache connections as well as just channels. In this case, each call to createConnection() creates a new connection (or retrieves an idle one from the cache). Closing a connection returns it to the cache (if the cache size has not been reached). Channels created on such connections are cached too. The use of separate connections might be useful in some environments, such as consuming from an HA cluster, in conjunction with a load balancer, to connect to different cluster members. Set the cacheMode to CacheMode.CONNECTION.

[Note] Note

This does not limit the number of connections, it specifies how many idle open connections are allowed.

Starting with version 1.5.5, a new property connectionLimit is provided. When this is set, it limits the total number of connections allowed. When set, if the limit is reached, the channelCheckoutTimeLimit is used to wait for a connection to become idle. If the time is exceeded, an AmqpTimeoutException is thrown.

[Important] Important

When the cache mode is CONNECTION, automatic declaration of queues etc. (See the section called “Automatic Declaration of Exchanges, Queues and Bindings”) is NOT supported.

Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. Then, the same executor will be used by all connections and its threads can be shared. The executor’s thread pool should be unbounded, or set appropriately for the expected utilization (usually, at least one thread per connection). If multiple channels are created on each connection then the pool size will affect the concurrency, so a variable (or simple cached) thread pool executor would be most suitable.

It is important to understand that the cache size is (by default) not a limit, but merely the number of channels that can be cached. With a cache size of, say, 10, any number of channels can actually be in use. If more than 10 channels are being used and they are all returned to the cache, 10 will go in the cache; the remainder will be physically closed.

Starting with version 1.6, the default channel cache size has been increased from 1 to 25. In high volume, multi-threaded, environments, a small cache means that channels are created and closed at a high rate. Increasing the default cache size will avoid this overhead. You should monitor the channels in use via the RabbitMQ Admin UI and consider increasing the cache size further if you see many channels being created and closed. The cache will only grow on-demand (to suit the concurrency requirements of the application) so this change will not impact existing low-volume applications.

Starting with version 1.4.2, the CachingConnectionFactory has a property channelCheckoutTimeout. When this property is greater than zero, the channelCacheSize becomes a limit on the number of channels that can be created on a connection. If the limit is reached, calling threads will block until a channel is available or this timeout is reached, in which case a AmqpTimeoutException is thrown.

[Warning] Warning

Channels used within the framework (e.g. RabbitTemplate) will be reliably returned to the cache. If you create channels outside of the framework, (e.g. by accessing the connection(s) directly and invoking createChannel()), you must return them (by closing) reliably, perhaps in a finally block, to avoid running out of channels.

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

When using XML, the configuration might look like this:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
[Note] Note

There is also a SingleConnectionFactory implementation which is only available in the unit test code of the framework. It is simpler than CachingConnectionFactory since it does not cache channels, but it is not intended for practical usage outside of simple tests due to its lack of performance and resilience. If you find a need to implement your own ConnectionFactory for some reason, the AbstractConnectionFactory base class may provide a nice starting point.

A ConnectionFactory can be created quickly and conveniently using the rabbit namespace:

<rabbit:connection-factory id="connectionFactory"/>

In most cases this will be preferable since the framework can choose the best defaults for you. The created instance will be a CachingConnectionFactory. Keep in mind that the default cache size for channels is 25. If you want more channels to be cached set a larger value via the channelCacheSize property. In XML it would look like this:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

And with the namespace you can just add the channel-cache-size attribute:

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

The default cache mode is CHANNEL, but you can configure it to cache connections instead; in this case, we use connection-cache-size:

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

Host and port attributes can be provided using the namespace

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

Alternatively, if running in a clustered environment, use the addresses attribute.

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672"/>

Here’s an example with a custom thread factory that prefixes thread names with rabbitmq-.

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

Naming Connections

Starting with version 1.7 a ConnectionNameStrategy is provided for the injection into the AbstractionConnectionFactory. The generated name is used for the application-specific identification of the target RabbitMQ connection. The connection name is displayed in the management UI if the RabbitMQ server supports it. This value doesn’t have to be unique and cannot be used as a connection identifier e.g. in HTTP API requests. This value is supposed to be human-readable and is a part of ClientProperties under connection_name key. A simple Lambda can be used:

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

The ConnectionFactory argument can be used to distinguish target connection names by some logic. By default, the beanName of the AbstractConnectionFactory, a hex String representing the object, and an internal counter are used to generate the connection_name. The <rabbit:connection-factory> namespace component is also supplied with the connection-name-strategy attribute.

An implementation SimplePropertyValueConnectionNameStrategy is provided that sets the connection name to an application property. Declare it as a @Bean and inject it into the connection factory:

@Bean
public ConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

The property must exist in the application context’s Environment.

[Note] Note

When using Spring Boot and it’s autoconfigured connection factory, it is only necessary to declare the ConnectionNameStrategy @Bean. Boot will auto-detect the bean and wire it into the factory.

Blocked Connections and Resource Constraints

The connection might be blocked for interaction from the Broker according to the Memory Alarm . Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection can be supplied with com.rabbitmq.client.BlockedListener s to be notified for connection blocked and unblocked events. In addition the AbstractConnectionFactory emits a ConnectionBlockedEvent and ConnectionUnblockedEvent, respectively, via its internal BlockedListener implementation. These allow you to provide application logic to react appropriately to problems on the broker and take some corrective actions for example.

[Important] Important

When the application is configured with a single CachingConnectionFactory, as it is by default with Spring Boot auto-configuration, the application will stop working when the connection is blocked by the Broker. And when it is blocked by the Broker, any its clients stop to work. If we have producers and consumers in the same application, we may end up with a deadlock when producers are blocking the connection because there are no resources on the Broker anymore and consumers can’t free them because the connection is blocked. To mitigate the problem, it is suggested to have one more separate CachingConnectionFactory instance with the same options - one for producers and one for consumers. A separate CachingConnectionFactory isn’t possible for transactional producers that execute on a consumer thread, since they should reuse the Channel associated with the consumer transactions.

Starting with version 2.0.2, the RabbitTemplate has a configuration option to automatically use a second connection factory, unless transactions are being used. See the section called “Using a Separate Connection” for more information. The ConnectionNameStrategy for the publisher connection is the same as the primary strategy with .publisher appended to the result of calling the method.

Starting with version 1.7.7, an AmqpResourceNotAvailableException is provided, which is thrown now when SimpleConnection.createChannel() can’t create a Channel, for example, because the channelMax limit is reached and there are no available channels in the cache. This exception can be used in the RetryPolicy to recover the operation after some back-off.

Configuring the Underlying Client Connection Factory

The CachingConnectionFactory uses an instance of the Rabbit client ConnectionFactory; a number of configuration properties are passed through (host, port, userName, password, requestedHeartBeat, connectionTimeout for example) when setting the equivalent property on the CachingConnectionFactory. To set other properties (clientProperties for example), define an instance of the rabbit factory and provide a reference to it using the appropriate constructor of the CachingConnectionFactory. When using the namespace as described above, provide a reference to the configured factory in the connection-factory attribute. For convenience, a factory bean is provided to assist in configuring the connection factory in a Spring application context, as discussed in the next section.

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
[Note] Note

The 4.0.x client enables automatic recovery by default; while compatible with this feature, Spring AMQP has its own recovery mechanisms and the client recovery feature generally isn’t needed. It is recommended to disable amqp-client automatic recovery, to avoid getting AutoRecoverConnectionNotCurrentlyOpenException s when the broker is available, but the connection has not yet recovered. You may notice this exception, for example, when a RetryTemplate is configured in a RabbitTemplate, even when failing over to another broker in a cluster. Since the auto recovering connection recovers on a timer, the connection may be recovered faster using Spring AMQP’s recovery mechanisms. Starting with version 1.7.1, Spring AMQP disables it unless you explicitly create your own RabbitMQ connection factory and provide it to the CachingConnectionFactory. RabbitMQ ConnectionFactory instances created by the RabbitConnectionFactoryBean will also have the option disabled by default.

RabbitConnectionFactoryBean and Configuring SSL

Starting with version 1.4, a convenient RabbitConnectionFactoryBean is provided to enable convenient configuration of SSL properties on the underlying client connection factory, using dependency injection. Other setters simply delegate to the underlying factory. Previously you had to configure the SSL options programmatically.

<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>

Refer to the RabbitMQ Documentation for information about configuring SSL. Omit the keyStore and trustStore configuration to connect over SSL without certificate validation. Key and trust store configuration can be provided as follows:

The sslPropertiesLocation property is a Spring Resource pointing to a properties file containing the following keys:

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

The keyStore and truststore are Spring Resources pointing to the stores. Typically this properties file will be secured by the operating system with the application having read access.

Starting with Spring AMQP version 1.5, these properties can be set directly on the factory bean. If both discrete properties and sslPropertiesLocation is provided, properties in the latter will override the discrete values.

Routing Connection Factory

Starting with version 1.3, the AbstractRoutingConnectionFactory has been introduced. This provides a mechanism to configure mappings for several ConnectionFactories and determine a target ConnectionFactory by some lookupKey at runtime. Typically, the implementation checks a thread-bound context. For convenience, Spring AMQP provides the SimpleRoutingConnectionFactory, which gets the current thread-bound lookupKey from the SimpleResourceHolder:

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
	<property name="targetConnectionFactories">
		<map>
			<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
			<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
		</map>
	</property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

It is important to unbind the resource after use. For more information see the JavaDocs of AbstractRoutingConnectionFactory.

Starting with version 1.4, the RabbitTemplate supports the SpEL sendConnectionFactorySelectorExpression and receiveConnectionFactorySelectorExpression properties, which are evaluated on each AMQP protocol interaction operation (send, sendAndReceive, receive or receiveAndReply), resolving to a lookupKey value for the provided AbstractRoutingConnectionFactory. Bean references, such as "@vHostResolver.getVHost(#root)" can be used in the expression. For send operations, the Message to be sent is the root evaluation object; for receive operations, the queueName is the root evaluation object.

The routing algorithm is: If the selector expression is null, or is evaluated to null, or the provided ConnectionFactory isn’t an instance of AbstractRoutingConnectionFactory, everything works as before, relying on the provided ConnectionFactory implementation. The same occurs if the evaluation result isn’t null, but there is no target ConnectionFactory for that lookupKey and the AbstractRoutingConnectionFactory is configured with lenientFallback = true. Of course, in the case of an AbstractRoutingConnectionFactory it does fallback to its routing implementation based on determineCurrentLookupKey(). But, if lenientFallback = false, an IllegalStateException is thrown.

The Namespace support also provides the send-connection-factory-selector-expression and receive-connection-factory-selector-expression attributes on the <rabbit:template> component.

Also starting with version 1.4, you can configure a routing connection factory in a listener container. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with setQueueNames("foo", "bar"), the lookup key will be "[foo,bar]" (no spaces).

Starting with version 1.6.9 you can add a qualifier to the lookup key using setLookupKeyQualifier on the listener container. This would enable, for example, listening to queues with the same name, but in different virtual host (where you would have a connection factory for each).

For example, with lookup key qualifier foo and a container listening to queue bar, the lookup key you would register the target connection factory with would be foo[bar].

Queue Affinity and the LocalizedQueueConnectionFactory

When using HA queues in a cluster, for the best performance, it can be desirable to connect to the physical broker where the master queue resides. While the CachingConnectionFactory can be configured with multiple broker addresses; this is to fail over and the client will attempt to connect in order. The LocalizedQueueConnectionFactory uses the REST API provided by the admin plugin to determine which node the queue is mastered. It then creates (or retrieves from a cache) a CachingConnectionFactory that will connect to just that node. If the connection fails, the new master node is determined and the consumer connects to it. The LocalizedQueueConnectionFactory is configured with a default connection factory, in case the physical location of the queue cannot be determined, in which case it will connect as normal to the cluster.

The LocalizedQueueConnectionFactory is a RoutingConnectionFactory and the SimpleMessageListenerContainer uses the queue names as the lookup key as discussed in the section called “Routing Connection Factory” above.

[Note] Note

For this reason (the use of the queue name for the lookup), the LocalizedQueueConnectionFactory can only be used if the container is configured to listen to a single queue.

[Note] Note

The RabbitMQ management plugin must be enabled on each node.

[Caution] Caution

This connection factory is intended for long-lived connections, such as those used by the SimpleMessageListenerContainer. It is not intended for short connection use, such as with a RabbitTemplate because of the overhead of invoking the REST API before making the connection. Also, for publish operations, the queue is unknown, and the message is published to all cluster members anyway, so the logic of looking up the node has little value.

Here is an example configuration, using Spring Boot’s RabbitProperties to configure the factories:

@Autowired
private RabbitProperties props;

private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };

private final String[] nodes = { "[email protected] ", "[email protected] " };

@Bean
public ConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public ConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            this.adminUris, this.nodes,
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

Notice that the first three parameters are arrays of addresses, adminUris and nodes. These are positional in that when a container attempts to connect to a queue, it determines on which node the queue is mastered and connects to the address in the same array position.

Publisher Confirms and Returns

Confirmed and returned messages are supported by setting the CachingConnectionFactory's publisherConfirms and publisherReturns properties to 'true' respectively.

When these options are set, Channel s created by the factory are wrapped in an PublisherCallbackChannel, which is used to facilitate the callbacks. When such a channel is obtained, the client can register a PublisherCallbackChannel.Listener with the Channel. The PublisherCallbackChannel implementation contains logic to route a confirm/return to the appropriate listener. These features are explained further in the following sections.

Also see simplePublisherConfirms in the section called “Scoped Operations”.

[Tip] Tip

For some more background information, please see the following blog post by the RabbitMQ team titled Introducing Publisher Confirms .

Connection and Channel Listeners

The connection factory supports registering ConnectionListener and ChannelListener implementations. This allows you to receive notifications for connection and channel related events. (A ConnectionListener is used by the RabbitAdmin to perform declarations when the connection is established - see the section called “Automatic Declaration of Exchanges, Queues and Bindings” for more information).

ConnectionListener. 

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

Starting with version 2.0, the org.springframework.amqp.rabbit.connection.Connection object can be supplied with com.rabbitmq.client.BlockedListener s to be notified for connection blocked and unblocked events.

ChannelListener. 

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

See the section called “Publishing is Asynchronous - How to Detect Success and Failures” for one scenario where you might want to register a ChannelListener.

Logging Channel Close Events

A mechanism to enable users to control logging levels was introduced in version 1.5.

The CachingConnectionFactory uses a default strategy to log channel closures as follows:

  • Normal channel closes (200 OK) are not logged.
  • If a channel is closed due to a failed passive queue declaration, it is logged at debug level.
  • If a channel is closed because the basic.consume is refused due to an exclusive consumer condition, it is logged at INFO level.
  • All others are logged at ERROR level.

To modify this behavior, inject a custom ConditionalExceptionLogger into the CachingConnectionFactory in its closeExceptionLogger property.

Also see the section called “Consumer Events”.

Runtime Cache Properties

Staring with version 1.6, the CachingConnectionFactory now provides cache statistics via the getCacheProperties() method. These statistics can be used to tune the cache to optimize it in production. For example, the high water marks can be used to determine whether the cache size should be increased. If it equals the cache size, you might want to consider increasing further.

Table 3.1. Cache properties for CacheMode.CHANNEL

Property Meaning
 connectionName

The name of the connection generated by the ConnectionNameStrategy.

 channelCacheSize

The currently configured maximum channels that are allowed to be idle.

 localPort

The local port for the connection (if available). This can be used to correlate with connections/channels on the RabbitMQ Admin UI.

 idleChannelsTx

The number of transactional channels that are currently idle (cached).

 idleChannelsNotTx

The number of non-transactional channels that are currently idle (cached).

 idleChannelsTxHighWater

The maximum number of transactional channels that have been concurrently idle (cached).

 idleChannelsNotTxHighWater

The maximum number of non-transactional channels have been concurrently idle (cached).


Table 3.2. Cache properties for CacheMode.CONNECTION

Property Meaning
 connectionName:<localPort>

The name of the connection generated by the ConnectionNameStrategy.

 openConnections

The number of connection objects representing connections to brokers.

 channelCacheSize

The currently configured maximum channels that are allowed to be idle.

 connectionCacheSize

The currently configured maximum connections that are allowed to be idle.

 idleConnections

The number of connections that are currently idle.

 idleConnectionsHighWater

The maximum number of connections that have been concurrently idle.

 idleChannelsTx:<localPort>

The number of transactional channels that are currently idle (cached) for this connection. The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI.

 idleChannelsNotTx:<localPort>

The number of non-transactional channels that are currently idle (cached) for this connection. The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI.

 idleChannelsTxHighWater:
<localPort>

The maximum number of transactional channels that have been concurrently idle (cached). The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI.

 idleChannelsNotTxHighWater:
<localPort>

The maximum number of non-transactional channels have been concurrently idle (cached). The localPort part of the property name can be used to correlate with connections/channels on the RabbitMQ Admin UI.


The cacheMode property (CHANNEL or CONNECTION is also included).

Figure 3.1. JVisualVM Example

cacheStats

RabbitMQ Automatic Connection/Topology recovery

Since the first version of Spring AMQP, the framework has provided its own connection and channel recovery in the event of a broker failure. Also, as discussed in Section 3.1.11, “Configuring the broker”, the RabbitAdmin will re-declare any infrastructure beans (queues etc) when the connection is re-established. It therefore does not rely on the Auto Recovery that is now provided by the amqp-client library. Spring AMQP now uses the 4.0.x version of amqp-client, which has auto recovery enabled by default. Spring AMQP can still use its own recovery mechanisms if you wish, disabling it in the client, (by setting the automaticRecoveryEnabled property on the underlying RabbitMQ connectionFactory to false). However, the framework is completely compatible with auto recovery being enabled. This means any consumers you create within your code (perhaps via RabbitTemplate.execute()) can be recovered automatically.

3.1.3 Adding Custom Client Connection Properties

The CachingConnectionFactory now allows you to access the underlying connection factory to allow, for example, setting custom client properties:

connectionFactory.getRabbitConnectionFactory().getClientProperties().put("foo", "bar");

These properties appear in the RabbitMQ Admin UI when viewing the connection.

3.1.4 AmqpTemplate

Introduction

As with many other high-level abstractions provided by the Spring Framework and related projects, Spring AMQP provides a "template" that plays a central role. The interface that defines the main operations is called AmqpTemplate. Those operations cover the general behavior for sending and receiving Messages. In other words, they are not unique to any implementation, hence the "AMQP" in the name. On the other hand, there are implementations of that interface that are tied to implementations of the AMQP protocol. Unlike JMS, which is an interface-level API itself, AMQP is a wire-level protocol. The implementations of that protocol provide their own client libraries, so each implementation of the template interface will depend on a particular client library. Currently, there is only a single implementation: RabbitTemplate. In the examples that follow, you will often see usage of an "AmqpTemplate", but when you look at the configuration examples, or any code excerpts where the template is instantiated and/or setters are invoked, you will see the implementation type (e.g. "RabbitTemplate").

As mentioned above, the AmqpTemplate interface defines all of the basic operations for sending and receiving Messages. We will explore Message sending and reception, respectively, in the two sections that follow.

See also the section called “AsyncRabbitTemplate”.

Adding Retry Capabilities

Starting with version 1.3 you can now configure the RabbitTemplate to use a RetryTemplate to help with handling problems with broker connectivity. Refer to the spring-retry project for complete information; the following is just one example that uses an exponential back off policy and the default SimpleRetryPolicy which will make three attempts before throwing the exception to the caller.

Using the XML namespace:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

Using @Configuration:

@Bean
public AmqpTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

Starting with version 1.4, in addition to the retryTemplate property, the recoveryCallback option is supported on the RabbitTemplate. It is used as a second argument for the RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback).

[Note] Note

The RecoveryCallback is somewhat limited in that the retry context only contains the lastThrowable field. For more sophisticated use cases, you should use an external RetryTemplate so that you can convey additional information to the RecoveryCallback via the context’s attributes:

retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

In this case, you would not inject a RetryTemplate into the RabbitTemplate.

Publishing is Asynchronous - How to Detect Success and Failures

Publishing messages is an asynchronous mechanism and, by default, messages that can’t be routed are simply dropped by RabbitMQ. For successful publishing you can receive an async confirmation as described in the section called “Publisher Confirms and Returns” below. Let’s consider two failure scenarios:

  • publish to an exchange but there is no matching destination queue
  • publish to a non-existent exchange

The first case is covered by publisher returns as described in the section called “Publisher Confirms and Returns” below.

For the second case, the message is dropped, no return is generated; the underlying channel is closed with an exception. By default, this exception is logged, but you can register a ChannelListener with the CachingConnectionFactory to obtain notifications of such events:

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

You can examine the signal’s reason property to determine the problem that occurred.

To detect the exception on the sending thread, you can setChannelTransacted(true) on the RabbitTemplate and the exception will be detected on the txCommit(). However, transactions significantly impede performance so consider this carefully before enabling transactions for just this one use case.

Publisher Confirms and Returns

The RabbitTemplate implementation of AmqpTemplate supports Publisher Confirms and Returns.

For returned messages, the template’s mandatory property must be set to true, or the mandatory-expression must evaluate to true for a particular message. This feature requires a CachingConnectionFactory that has its publisherReturns property set to true (see the section called “Publisher Confirms and Returns”). Returns are sent to the client by it registering a RabbitTemplate.ReturnCallback by calling setReturnCallback(ReturnCallback callback). The callback must implement this method:

void returnedMessage(Message message, int replyCode, String replyText,
          String exchange, String routingKey);

Only one ReturnCallback is supported by each RabbitTemplate. See also the section called “Reply Timeout”.

For Publisher Confirms (aka Publisher Acknowledgements), the template requires a CachingConnectionFactory that has its publisherConfirms property set to true. Confirms are sent to the client by it registering a RabbitTemplate.ConfirmCallback by calling setConfirmCallback(ConfirmCallback callback). The callback must implement this method:

void confirm(CorrelationData correlationData, boolean ack, String cause);

The CorrelationData is an object supplied by the client when sending the original message. The ack is true for an ack and false for a nack. For nack s, the cause may contain a reason for the nack, if it is available when the nack is generated. An example is when sending a message to a non-existent exchange. In that case the broker closes the channel; the reason for the closure is included in the cause. cause was added in version 1.4.

Only one ConfirmCallback is supported by a RabbitTemplate.

[Note] Note

When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal). When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received. When using confirms, the channel will be closed when the last confirm is received. When using only returns, the channel will remain open for the full 5 seconds. It is generally recommended to set the connection factory’s channelCacheSize to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed. You can monitor channel usage using the RabbitMQ management plugin; if you see channels being opened/closed rapidly you should consider increasing the cache size to reduce overhead on the server.

[Important] Important

Before version 2.1, channels enabled for publisher confirms were returned to the cache before the confirm(s) were received. Some other process could check out the channel and perform some operation that causes the channel to close - such as publishing a message to a non-existent exchange. This could cause the confirmation to be lost; version 2.1 and later no longer return the channel to the cache while confirmations are outstanding. Since the RabbitTemplate performs a logical close() on the channel after each operation; in general, this means that only one confirm will be outstanding on a channel at a time.

Starting with version 2.1, the CorrelationData object has a ListenableFuture which can be used to get the result, instead of using a ConfirmCallback on the template.

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());

Since it’s a ListenableFuture<Confirm>, you can either get() the result when ready, or add listeners for an async callback. The Confirm object is a simple bean with 2 properties ack and reason (for nacks). The reason is not populated for broker-generated nacks; it is populated for nacks generated by the framework (e.g. closing the connection while acks are outstanding).

In addition, when both confirms and returns are enabled, the CorrelationData is populated with the returned message. It is guaranteed that this will occur before the future is set with the ack.

See also the section called “Scoped Operations” for a simpler mechanism for waiting for publisher confirms.

Scoped Operations

Normally, when using the template, a Channel is checked out of the cache (or created), used for the operation, and returned to the cache for reuse. In a multi-threaded environment, there is no guarantee that the next operation will use the same channel. There may be times, however, where you want to have more control over the use of a channel, and ensure that a number of operations are all performed on the same channel.

Starting with version 2.0, a new method invoke is provided, with an OperationsCallback. Any operations performed within the scope of the callback, and on the provided RabbitOperations argument, will use the same dedicated Channel, which will be closed at the end (not returned to a cache). If the channel is a PublisherCallbackChannel, it will be returned to the cache after all confirms have been received (see the section called “Publisher Confirms and Returns” above).

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

One example of why you might need this is if you wish to use the waitForConfirms() method on the underlying Channel. This method was not previously exposed using the Spring API because the channel is, generally, cached and shared as discussed above. The RabbitTemplate now provides waitForConfirms(long timeout) and waitForConfirmsOrDie(long timeout) which delegate to the dedicated channel used within the scope of the OperationsCallback. The methods cannot be used outside of that scope, for obvious reasons.

Note that a higher-level abstraction which allows you to correlate confirms to requests is provided elsewhere (see the section called “Publisher Confirms and Returns”). If you only want to wait until the broker has confirmed delivery, you can use this simple technique:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

If you wish RabbitAdmin operations to be invoked on the same channel, within the scope of the OperationsCallback, the admin must have been constructed using the same RabbitTemplate that was used for the invoke operation.

[Note] Note

The above discussion is moot if the template operations are already performed within the scope of an existing transaction. For example, when running on a transacted listener container thread and performing operations on a transacted template. In that case, the operations will be performed on that channel and committed when the thread returns to the container; it is not necessary to use invoke in that scenario.

When using confirms in this way, much of the infrastructure set up for correlating confirms to requests is not really needed. Starting with version 2.1, the connection factory supports a new property simplePublisherConfirms. When this is true the infrastructure is avoided and the confirm processing will be more efficient.

Furthermore, the RabbitTemplate sets the property publisherSequenceNumber in the sent message MessageProperties. If you wish to check (or log, etc) specific confirmations, you can do so with an overloaded invoke method:

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
[Note] Note

These ConfirmCallback objects (for acks and nacks) are the Rabbit client callbacks, not the template callback.

Example:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));

Messaging integration

Starting with version 1.4 RabbitMessagingTemplate, built on top of RabbitTemplate, provides an integration with the Spring Framework messaging abstraction, i.e. org.springframework.messaging.Message. This allows you to send and receive messages using the spring-messaging Message<?> abstraction. This abstraction is used by other Spring projects such as Spring Integration and Spring’s STOMP support. There are two message converters involved; one to convert between a spring-messaging Message<?> and Spring AMQP’s Message abstraction, and one to convert between Spring AMQP’s Message abstraction and the format required by the underlying RabbitMQ client library. By default, the message payload is converted by the provided RabbitTemplate 's message converter. Alternatively, you can inject a custom MessagingMessageConverter with some other payload converter:

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

Validated User Id

Starting with version 1.6, the template now supports a user-id-expression (userIdExpression when using Java configuration). If a message is sent, the user id property is set (if not already set) after evaluating this expression. The root object for the evaluation is the message to be sent.

Examples:

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

The first example is a literal expression; the second obtains the username property from a connection factory bean in the application context.

Using a Separate Connection

Starting with version 2.0.2, set the usePublisherConnection property to true to use a different connection to that used by listener containers, when possible. This is to avoid consumers being blocked when a producer is blocked for any reason. The CachingConnectionFactory now maintains a second internal connection factory for this purpose. If the rabbit template is running in a transaction started by the listener container, the container’s channel is used, regardless of this setting.

[Important] Important

In general, you should not use a RabbitAdmin with a template that has this set to true. Use the RabbitAdmin constructor that takes a connection factory; if you use the other constructor that takes a template, ensure the template’s property is false. This is because, often, an admin is used to declare queues for listener containers; using a template that has the property set to true would mean that exclusive queues (such as AnonymousQueue) would be declared on a different connection to that used by listener containers. In that case, the queues cannot be used by the containers.

3.1.5 Sending messages

Introduction

When sending a Message, one can use any of the following methods:

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

We can begin our discussion with the last method listed above since it is actually the most explicit. It allows an AMQP Exchange name to be provided at runtime along with a routing key. The last parameter is the callback that is responsible for actual creating of the Message instance. An example of using this method to send a Message might look this this:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
    new Message("12.34".getBytes(), someProperties));

The "exchange" property can be set on the template itself if you plan to use that template instance to send to the same exchange most or all of the time. In such cases, the second method listed above may be used instead. The following example is functionally equivalent to the previous one:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

If both the "exchange" and "routingKey" properties are set on the template, then the method accepting only the Message may be used:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

A better way of thinking about the exchange and routing key properties is that the explicit method parameters will always override the template’s default values. In fact, even if you do not explicitly set those properties on the template, there are always default values in place. In both cases, the default is an empty String, but that is actually a sensible default. As far as the routing key is concerned, it’s not always necessary in the first place (e.g. a Fanout Exchange). Furthermore, a Queue may be bound to an Exchange with an empty String. Those are both legitimate scenarios for reliance on the default empty String value for the routing key property of the template. As far as the Exchange name is concerned, the empty String is quite commonly used because the AMQP specification defines the "default Exchange" as having no name. Since all Queues are automatically bound to that default Exchange (which is a Direct Exchange) using their name as the binding value, that second method above can be used for simple point-to-point Messaging to any Queue through the default Exchange. Simply provide the queue name as the "routingKey" - either by providing the method parameter at runtime:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

Or, if you prefer to create a template that will be used for publishing primarily or exclusively to a single Queue, the following is perfectly reasonable:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

Message Builder API

Starting with version 1.3, a message builder API is provided by the MessageBuilder and MessagePropertiesBuilder; they provides a convenient "fluent" means of creating a message or message properties:

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();

or

MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

Each of the properties defined on the MessageProperties can be set. Other methods include setHeader(String key, String value), removeHeader(String key), removeHeaders(), and copyProperties(MessageProperties properties). Each property setting method has a set*IfAbsent() variant. In the cases where a default initial value exists, the method is named set*IfAbsentOrDefault().

Five static methods are provided to create an initial message builder:

public static MessageBuilder withBody(byte[] body) 1

public static MessageBuilder withClonedBody(byte[] body) 2

public static MessageBuilder withBody(byte[] body, int from, int to) 3

public static MessageBuilder fromMessage(Message message) 4

public static MessageBuilder fromClonedMessage(Message message) 5

1

The message created by the builder will have a body that is a direct reference to the argument.

2

The message created by the builder will have a body that is a new array containing a copy of bytes in the argument.

3

The message created by the builder will have a body that is a new array containing the range of bytes from the argument. See Arrays.copyOfRange() for more details.

4

The message created by the builder will have a body that is a direct reference to the body of the argument. The argument’s properties are copied to a new MessageProperties object.

5

The message created by the builder will have a body that is a new array containing a copy of the argument’s body. The argument’s properties are copied to a new MessageProperties object.

public static MessagePropertiesBuilder newInstance() 1

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3

1

A new message properties object is initialized with default values.

2

The builder is initialized with, and build() will return, the provided properties object.,

3

The argument’s properties are copied to a new MessageProperties object.

With the RabbitTemplate implementation of AmqpTemplate, each of the send() methods has an overloaded version that takes an additional CorrelationData object. When publisher confirms are enabled, this object is returned in the callback described in Section 3.1.4, “AmqpTemplate”. This allows the sender to correlate a confirm (ack or nack) with the sent message.

Starting with version 1.6.7, the CorrelationAwareMessagePostProcessor interface was introduced, allowing the correlation data to be modified after the message has been converted:

Message postProcessMessage(Message message, Correlation correlation);

In version 2.0, this interface is deprecated; the method has been moved to MessagePostProcessor with a default implementation that delegates to postProcessMessage(Message message).

Also starting with version 1.6.7 a new callback interface is provided CorrelationDataPostProcessor; this is invoked after all MessagePostProcessor s (provided in the send() method as well as those provided in setBeforePublishPostProcessors()). Implementations can update or replace the correlation data supplied in the send() method (if any). The Message and original CorrelationData (if any) are provided as arguments.

CorrelationData postProcess(Message message, CorrelationData correlationData);

Publisher Returns

When the template’s mandatory property is true returned messages are provided by the callback described in Section 3.1.4, “AmqpTemplate”.

Starting with version 1.4 the RabbitTemplate supports the SpEL mandatoryExpression property, which is evaluated against each request message, as the root evaluation object, resolving to a boolean value. Bean references, such as "@myBean.isMandatory(#root)" can be used in the expression.

Publisher returns can also be used internally by the RabbitTemplate in send and receive operations. See the section called “Reply Timeout” for more information.

Batching

Starting with version 1.4.2, the BatchingRabbitTemplate has been introduced. This is a subclass of RabbitTemplate with an overridden send method that batches messages according to the BatchingStrategy; only when a batch is complete is the message sent to RabbitMQ.

public interface BatchingStrategy {

	MessageBatch addToBatch(String exchange, String routingKey, Message message);

	Date nextRelease();

	Collection<MessageBatch> releaseBatches();

}
[Caution] Caution

Batched data is held in memory; unsent messages can be lost in the event of a system failure.

A SimpleBatchingStrategy is provided. It supports sending messages to a single exchange/routing key. It has properties:

  • batchSize - the number of messages in a batch before it is sent
  • bufferLimit - the maximum size of the batched message; this will preempt the batchSize if exceeded, and cause a partial batch to be sent
  • timeout - a time after which a partial batch will be sent when there is no new activity adding messages to the batch

The SimpleBatchingStrategy formats the batch by preceding each embedded message with a 4 byte binary length. This is communicated to the receiving system by setting the springBatchFormat message property to lengthHeader4.

[Important] Important

Batched messages are automatically de-batched by listener containers (using the springBatchFormat message header). Rejecting any message from a batch will cause the entire batch to be rejected.

3.1.6 Receiving messages

Introduction

Message reception is always a little more complicated than sending. There are two ways to receive a Message. The simpler option is to poll for a single Message at a time with a polling method call. The more complicated yet more common approach is to register a listener that will receive Messages on-demand, asynchronously. We will look at an example of each approach in the next two sub-sections.

Polling Consumer

The AmqpTemplate itself can be used for polled Message reception. By default, if no message is available, null is returned immediately; there is no blocking. Starting with version 1.5, you can now set a receiveTimeout, in milliseconds, and the receive methods will block for up to that long, waiting for a message. A value less than zero means block indefinitely (or at least until the connection to the broker is lost). Version 1.6 introduced variants of the receive methods allowing the timeout to be passed in on each call.

[Caution] Caution

Since the receive operation creates a new QueueingConsumer for each message, this technique is not really appropriate for high-volume environments; consider using an asynchronous consumer, or a receiveTimeout of zero for those use cases.

There are four simple receive methods available. As with the Exchange on the sending side, there is a method that requires a default queue property having been set directly on the template itself, and there is a method that accepts a queue parameter at runtime. Version 1.6 introduced variants to accept timeoutMillis to override receiveTimeout on a per-request basis.

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

Just like in the case of sending messages, the AmqpTemplate has some convenience methods for receiving POJOs instead of Message instances, and implementations will provide a way to customize the MessageConverter used to create the Object returned:

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Message receiveAndConvert(long timeoutMillis) throws AmqpException;

Message receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

Starting with version 2.0, there are variants of these methods that take an additional ParameterizedTypeReference argument to convert complex types. The template must be configured with a SmartMessageConverter; see the section called “Converting From a Message With RabbitTemplate” for more information.

Similar to sendAndReceive methods, beginning with version 1.3, the AmqpTemplate has several convenience receiveAndReply methods for synchronously receiving, processing and replying to messages:

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
	   throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
 	throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
	String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
	String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
 	ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

The AmqpTemplate implementation takes care of the receive and reply phases. In most cases you should provide only an implementation of ReceiveAndReplyCallback to perform some business logic for the received message and build a reply object or message, if needed. Note, a ReceiveAndReplyCallback may return null. In this case no reply is sent and receiveAndReply works like the receive method. This allows the same queue to be used for a mixture of messages, some of which may not need a reply.

Automatic message (request and reply) conversion is applied only if the provided callback is not an instance of ReceiveAndReplyMessageCallback - which provides a raw message exchange contract.

The ReplyToAddressCallback is useful for cases requiring custom logic to determine the replyTo address at runtime against the received message and reply from the ReceiveAndReplyCallback. By default, replyTo information in the request message is used to route the reply.

The following is an example of POJO-based receive and reply…

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}

Asynchronous Consumer

[Important] Important

Spring AMQP also supports annotated-listener endpoints through the use of the @RabbitListener annotation and provides an open infrastructure to register endpoints programmatically. This is by far the most convenient way to setup an asynchronous consumer, see the section called “Annotation-driven Listener Endpoints” for more details.

[Important] Important

The prefetch default value used to be 1, which could lead to under-utilization of efficient consumers. Starting with version 2.0, the default prefetch value is now 250, which should keep consumers busy in most common scenarios and thus improve throughput.

There are nevertheless scenarios where the prefetch value should be low: for example, with large messages, especially if the processing is slow (messages could add up to a large amount of memory in the client process), and if strict message ordering is necessary (the prefetch value should be set back to 1 in this case).

Also, with low-volume messaging and multiple consumers (including concurrency within a single listener container instance), you may wish to reduce the prefetch to get a more even distribution of messages across consumers. It is also recomended to use prefetch = 1 with the MANUAL ack mode. The basicAck is async operation and if something wrong happens on the Broker (double ack for the same delivery tag, for example), you end up with processed subsequent messages in the batch, but unacked on the Broker and other consumer may see them.

See Section 3.1.17, “Message Listener Container Configuration”.

For more background about prefetch, see this post about consumer utilization in RabbitMQ and this post about queuing theory .

Message Listener

For asynchronous Message reception, a dedicated component (not the AmqpTemplate) is involved. That component is a container for a Message consuming callback. We will look at the container and its properties in just a moment, but first we should look at the callback since that is where your application code will be integrated with the messaging system. There are a few options for the callback starting with an implementation of the MessageListener interface:

public interface MessageListener {
    void onMessage(Message message);
}

If your callback logic depends upon the AMQP Channel instance for any reason, you may instead use the ChannelAwareMessageListener. It looks similar but with an extra parameter:

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
[Important] Important

In version 2.1, this interface moved from package o.s.amqp.rabbit.core to o.s.amqp.rabbit.listener.api.

MessageListenerAdapter

If you prefer to maintain a stricter separation between your application logic and the messaging API, you can rely upon an adapter implementation that is provided by the framework. This is often referred to as "Message-driven POJO" support.

[Note] Note

Version 1.5 introduced a more flexible mechanism for POJO messaging, the @RabbitListener annotation - see the section called “Annotation-driven Listener Endpoints” for more information.

When using the adapter, you only need to provide a reference to the instance that the adapter itself should invoke.

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

You can subclass the adapter and provide an implementation of getListenerMethodName() to dynamically select different methods based on the message. This method has two parameters, the originalMessage and extractedMessage, the latter being the result of any conversion. By default, a SimpleMessageConverter is configured; see the section called “SimpleMessageConverter” for more information and information about other converters available.

Starting with version 1.4.2, the original message has properties consumerQueue and consumerTag which can be used to determine which queue a message was received from.

Starting with version 1.5, you can configure a map of consumer queue/tag to method name, to dynamically select the method to call. If no entry is in the map, we fall back to the default listener method. The default listener method (if not set) is handleMessage.

Starting with version 2.0, a convenient FunctionalInterface has been provided:

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

	R handleMessage(T t);

}

This facilitates convenient configuration of the adapter using Java 8 lamdas:

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));
Container

Now that you’ve seen the various options for the Message-listening callback, we can turn our attention to the container. Basically, the container handles the "active" responsibilities so that the listener callback can remain passive. The container is an example of a "lifecycle" component. It provides methods for starting and stopping. When configuring the container, you are essentially bridging the gap between an AMQP Queue and the MessageListener instance. You must provide a reference to the ConnectionFactory and the queue name(s) or Queue instance(s) from which that listener should consume Messages.

With versions prior to version 2.0, there was one listener container - the SimpleMessageListenerContainer; there is now a second container - the DirectMessageListenerContainer. The differences between the containers and criteria you might apply when choosing which to use are described in the section called “Choosing a Container”.

Here is the most basic example using the, SimpleMessageListenerContainer :

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

As an "active" component, it’s most common to create the listener container with a bean definition so that it can simply run in the background. This can be done via XML:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

or

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

will create a DirectMessageListenerContainer (notice the type attribute - it defaults to simple).

Or, you may prefer to use the @Configuration style which will look very similar to the actual code snippet above:

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}
Consumer Priority

Starting with RabbitMQ Version 3.2, the broker now supports consumer priority (see Using Consumer Priorities with RabbitMQ ). This is enabled by setting the x-priority argument on the consumer. The SimpleMessageListenerContainer now supports setting consumer arguments:

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

For convenience, the namespace provides the priority attribute on the listener element:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

Starting with version 1.3 the queue(s) on which the container is listening can be modified at runtime; see Section 3.1.20, “Listener Container Queues”.

auto-delete Queues

When a container is configured to listen to auto-delete queue(s), or the queue has an x-expires option or the Time-To-Live policy is configured on the Broker, the queue is removed by the broker when the container is stopped (last consumer is cancelled). Before version 1.3, the container could not be restarted because the queue was missing; the RabbitAdmin only automatically redeclares queues etc, when the connection is closed/opens, which does not happen when the container is stopped/started.

Starting with version 1.3, the container will now use a RabbitAdmin to redeclare any missing queues during startup.

You can also use conditional declaration (the section called “Conditional Declaration”) together with an auto-startup="false" admin to defer queue declaration until the container is started.

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

In this case, the queue and exchange are declared by containerAdmin which has auto-startup="false" so the elements are not declared during context initialization. Also, the container is not started for the same reason. When the container is later started, it uses it’s reference to containerAdmin to declare the elements.

Batched Messages

Batched messages are automatically de-batched by listener containers (using the springBatchFormat message header). Rejecting any message from a batch will cause the entire batch to be rejected. See the section called “Batching” for more information about batching.

Consumer Events

The containers publish application events whenever a listener (consumer) experiences a failure of some kind. The event ListenerContainerConsumerFailedEvent has the following properties:

  • container - the listener container where the consumer experienced the problem.
  • reason - a textual reason for the failure.
  • fatal - a boolean indicating whether the failure was fatal; with non-fatal exceptions, the container will attempt to restart the consumer, according to the recoveryInterval or recoveryBackoff (for the SimpleMessageListenerContainer) or the monitorInterval (for the DirectMessageListenerContainer).
  • throwable - the Throwable that was caught.

These events can be consumed by implementing ApplicationListener<ListenerContainerConsumerFailedEvent>.

[Note] Note

System-wide events (such as connection failures) will be published by all consumers when concurrentConsumers is greater than 1.

If a consumer fails because one if its queues is being used exclusively, by default, as well as publishing the event, a WARN log is issued. To change this logging behavior, provide a custom ConditionalExceptionLogger in the SimpleMessageListenerContainer 's exclusiveConsumerExceptionLogger property. See also the section called “Logging Channel Close Events”.

Fatal errors are always logged at ERROR level; this it not modifiable.

Several other events are published at various stages of the container lifecycle:

  • AsyncConsumerStartedEvent (when the consumer is started)
  • AsyncConsumerRestartedEvent (when the consumer is restarted after a failure - SimpleMessageListenerContainer only)
  • AsyncConsumerTerminatedEvent (when a consumer is stopped normally)
  • AsyncConsumerStoppedEvent (when the consumer is stopped - SimpleMessageListenerContainer only)
  • ConsumeOkEvent (when a consumeOk is received from the broker, contains the queue name and consumerTag)
  • ListenerContainerIdleEvent (see the section called “Detecting Idle Asynchronous Consumers”)

Consumer Tags

You can provide a strategy to generate consumer tags. By default, the consumer tag will be generated by the broker.

public interface ConsumerTagStrategy {

    String createConsumerTag(String queue);

}

The queue is made available so it can (optionally) be used in the tag.

See Section 3.1.17, “Message Listener Container Configuration”.

Annotation-driven Listener Endpoints

Introduction

The easiest way to receive a message asynchronously is to use the annotated listener endpoint infrastructure. In a nutshell, it allows you to expose a method of a managed bean as a Rabbit listener endpoint.

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

}

The idea of the example above is that, whenever a message is available on the queue named myQueue, the processOrder method is invoked accordingly (in this case, with the payload of the message).

The annotated endpoint infrastructure creates a message listener container behind the scenes for each annotated method, using a RabbitListenerContainerFactory.

In the example above, myQueue must already exist and be bound to some exchange. The queue can be declared and bound automatically, as long as a RabbitAdmin exists in the application context.

[Note] Note

Property placeholders (${some.property}) or SpEL expressions (#{someExpression}) can be specified for the annotation properties (queues etc). See the section called “Listening to Multiple Queues” for an example of why you might use SpEL instead of a property placeholder.

@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {
    ...
  }

  @RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
  public String handleWithSimpleDeclare(String data) {
      ...
  }

}

In the first example, a queue myQueue will be declared automatically (durable) together with the exchange, if needed, and bound to the exchange with the routing key. In the second example, an anonymous (exclusive, auto-delete) queue will be declared and bound. Multiple QueueBinding entries can be provided, allowing the listener to listen to multiple queues. In the third example, a queue with the name retrieved from property my.queue will be declared if necessary, with the default binding to the default exchange using the queue name as the routing key.

Since version 2.0 the @Exchange annotation supports any exchange types, including custom. See more information in the AMQP Concepts document.

Use normal @Bean definitions when more advanced configuration is required.

Notice ignoreDeclarationExceptions on the exchange in the first example. This allows, for example, binding to an existing exchange that might have different settings (such as internal). By default the properties of an existing exchange must match.

Starting with version 2.0, you can now bind a queue to an exchange with multiple routing keys:

...
    key = { "red", "yellow" }
...

You can also specify arguments within @QueueBinding annotations for queues, exchanges and bindings. For example:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "auto.headers", autoDelete = "true",
                        arguments = @Argument(name = "x-message-ttl", value = "10000",
                                                type = "java.lang.Integer")),
        exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
        arguments = {
                @Argument(name = "x-match", value = "all"),
                @Argument(name = "foo", value = "bar"),
                @Argument(name = "baz")
        })
)
public String handleWithHeadersExchange(String foo) {
    ...
}

Notice that the x-message-ttl argument is set to 10 seconds for the queue. Since the argument type is not String, we have to specify its type; in this case Integer. As with all such declarations, if the queue exists already, the arguments must match those on the queue. For the header exchange, we set the binding arguments to match messages that have the header foo set to bar and the header baz must be present with any value. The x-match argument means both conditions must be satisfied.

The argument name, value, and type can be property placeholders (${...}) or SpEL expressions (#{...}). The name must resolve to a String; the expression for type must resolve to a Class or the fully-qualified name of a class. The value must resolve to something that can be converted by the DefaultConversionService to the type (such as the x-message-ttl in the above example).

If a name resolves to null or an empty String, that @Argument is ignored.

Meta-Annotations

Sometimes you may want to use the same configuration for multiple listeners. To reduce the boilerplate configuration, you can use meta-annotations to create your own listener annotation:

@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "metaFanout", type = ExchangeTypes.FANOUT)))
public @interface MyAnonFanoutListener {
}

public class MetaListener {

    @MyAnonFanoutListener
    public void handle1(String foo) {
        ...
    }

    @MyAnonFanoutListener
    public void handle2(String foo) {
        ...
    }

}

In this example, each listener created by the @MyAnonFanoutListener annotation will bind an anonymous, auto-delete queue to the fanout exchange metaFanout. The meta-annotation mechanism is simple in that attributes on the user-defined annotation are not examined - so you can’t override settings from the meta-annotation. Use normal @Bean definitions when more advanced configuration is required.

Enable Listener Endpoint Annotations

To enable support for @RabbitListener annotations add @EnableRabbit to one of your @Configuration classes.

@Configuration
@EnableRabbit
public class AppConfig {

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}

Since version 2.0, a DirectMessageListenerContainerFactory is also available, which creates DirectMessageListenerContainer s.

[Note] Note

To choose between the SimpleRabbitListenerContainerFactory and DirectRabbitListenerContainerFactory see the section called “Choosing a Container”.

By default, the infrastructure looks for a bean named rabbitListenerContainerFactory as the source for the factory to use to create message listener containers. In this case, and ignoring the RabbitMQ infrastructure setup, the processOrder method can be invoked with a core poll size of 3 threads and a maximum pool size of 10 threads.

It is possible to customize the listener container factory to use per annotation or an explicit default can be configured by implementing the RabbitListenerConfigurer interface. The default is only required if at least one endpoint is registered without a specific container factory. See the javadoc for full details and examples.

The container factories provide methods for adding MessagePostProcessor s that will be applied after receiving messages (before invoking the listener) and before sending replies.

Starting with version 2.0.6 a RetryTemplate and RecoveryCallback can be added to the listener container factory, used when sending replies. The RecoveryCallback is invoked when retries are exhausted; a SendRetryContextAccessor can be used to get information from the context:

factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
    Message failed = SendRetryContextAccessor.getMessage(ctx);
    Address replyTo = SendRetryContextAccessor.getAddress(ctx);
    Throwable t = ctx.getLastThrowable();
    ...
    return null;
});

If you prefer XML configuration, use the <rabbit:annotation-driven> element; any beans annotated with @RabbitListener will be detected.

For SimpleRabbitListenerContainer s:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="concurrentConsumers" value="3"/>
    <property name="maxConcurrentConsumers" value="10"/>
</bean>

and for DirectMessageListenerContainer s:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
      class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="consumersPerQueue" value="3"/>
</bean>

Starting with version 2.0, the @RabbitListener annotation has a concurrency property; it supports SpEL expressions (#{...}) and property placeholders (${...}). Its meaning, and allowed values, depend on the container type.

  • For the DirectMessageListenerContainer, the value must be a single integer value, which sets the consumersPerQueue property on the container.
  • For the SimpleRabbitListenerContainer, the value can be a single integer value, which sets the concurrentConsumers property on the container, or it can have the form m-n where m is the concurrentConsumers property, and n is the maxConcurrentConsumers property.

In either case, this setting overrides the setting(s) on the factory. Previously you had to define different container factories if you had listeners that required different concurrency.

Message Conversion for Annotated Methods

There are two conversion steps in the pipeline before invoking the listener. The first uses a MessageConverter to convert the incoming Spring AMQP Message to a spring-messaging Message. When the target method is invoked, the message payload is converted, if necessary, to the method parameter type.

The default MessageConverter for the first step is a Spring AMQP SimpleMessageConverter that handles conversion to String and java.io.Serializable objects; all others remain as a byte[]. In the following discussion, we call this the message converter.

The default converter for the second step is a GenericMessageConverter which delegates to a conversion service (an instance of DefaultFormattingConversionService). In the following discussion, we call this the method argument converter.

To change the message converter, simply add it as a property to the container factory bean:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    ...
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    ...
    return factory;
}

This configures a Jackson2 converter that expects header information to be present to guide the conversion.

You can also consider a ContentTypeDelegatingMessageConverter which can handle conversion of different content types.

In most cases, it is not necessary to customize the method argument converter unless, for example, you want to use a custom ConversionService.

In versions prior to 1.6, the type information to convert the JSON had to be provided in message headers, or a custom ClassMapper was required. Starting with version 1.6, if there are no type information headers, the type can be inferred from the target method arguments.

[Note] Note

This type inference only works for @RabbitListener at the method level.

See the section called “Jackson2JsonMessageConverter” for more information.

If you wish to customize the method argument converter, you can do so as follows:

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

    ...

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new GenericMessageConverter(myConversionService()));
        return factory;
    }

    @Bean
    public ConversionService myConversionService() {
        DefaultConversionService conv = new DefaultConversionService();
        conv.addConverter(mySpecialConverter());
        return conv;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    ...

}
[Important] Important

for multi-method listeners (see the section called “Multi-Method Listeners”), the method selection is based on the payload of the message after the message conversion; the method argument converter is only called after the method has been selected.

Programmatic Endpoint Registration

RabbitListenerEndpoint provides a model of a Rabbit endpoint and is responsible for configuring the container for that model. The infrastructure allows you to configure endpoints programmatically in addition to the ones that are detected by the RabbitListener annotation.

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
        endpoint.setQueueNames("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

In the example above, we used SimpleRabbitListenerEndpoint which provides the actual MessageListener to invoke but you could just as well build your own endpoint variant describing a custom invocation mechanism.

It should be noted that you could just as well skip the use of @RabbitListener altogether and only register your endpoints programmatically through RabbitListenerConfigurer.

Annotated Endpoint Method Signature

So far, we have been injecting a simple String in our endpoint but it can actually have a very flexible method signature. Let’s rewrite it to inject the Order with a custom header:

@Component
public class MyService {

    @RabbitListener(queues = "myQueue")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

These are the main elements you can inject in listener endpoints:

The raw org.springframework.amqp.core.Message.

The com.rabbitmq.client.Channel on which the message was received

The org.springframework.messaging.Message representing the incoming AMQP message. Note that this message holds both the custom and the standard headers (as defined by AmqpHeaders).

[Note] Note

Starting with version 1.6, the inbound deliveryMode header is now available in the header with name AmqpHeaders.RECEIVED_DELIVERY_MODE instead of AmqpHeaders.DELIVERY_MODE.

@Header-annotated method arguments to extract a specific header value, including standard AMQP headers.

@Headers-annotated argument that must also be assignable to java.util.Map for getting access to all headers.

A non-annotated element that is not one of the supported types (i.e. Message and Channel) is considered to be the payload. You can make that explicit by annotating the parameter with @Payload. You can also turn on validation by adding an extra @Valid.

The ability to inject Spring’s Message abstraction is particularly useful to benefit from all the information stored in the transport-specific message without relying on transport-specific API.

@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}

Handling of method arguments is provided by DefaultMessageHandlerMethodFactory which can be further customized to support additional method arguments. The conversion and validation support can be customized there as well.

For instance, if we want to make sure our Order is valid before processing it, we can annotate the payload with @Valid and configure the necessary validator as follows:

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}
Listening to Multiple Queues

When using the queues attribute, you can specify that the associated container can listen to multiple queues. You can use a @Header annotation to make the queue name from which a message was received available to the POJO method:

@Component
public class MyService {

    @RabbitListener(queues = { "queue1", "queue2" } )
    public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        ...
    }

}

Starting with version 1.5, you can externalize the queue names using property placeholders, and SpEL:

@Component
public class MyService {

    @RabbitListener(queues = "#{'${property.with.comma.delimited.queue.names}'.split(',')}" )
    public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        ...
    }

}

Prior to version 1.5, only a single queue could be specified this way; each queue needed a separate property.

Reply Management

The existing support in MessageListenerAdapter already allows your method to have a non-void return type. When that’s the case, the result of the invocation is encapsulated in a message sent either in the address specified in the ReplyToAddress header of the original message or in the default address configured on the listener. That default address can now be set using the @SendTo annotation of the messaging abstraction.

Assuming our processOrder method should now return an OrderStatus, it is possible to write it as follow to automatically send a reply:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

If you need to set additional headers in a transport-independent manner, you could return a Message instead, something like:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
        .withPayload(status)
        .setHeader("code", 1234)
        .build();
}

The @SendTo value is assumed as a reply exchange and routingKey pair following the pattern exchange/routingKey, where one of those parts can be omitted. The valid values are:

foo/bar - the replyTo exchange and routingKey.

foo/ - the replyTo exchange and default (empty) routingKey.

bar or /bar - the replyTo routingKey and default (empty) exchange.

/ or empty - the replyTo default exchange and default routingKey.

Also @SendTo can be used without a value attribute. This case is equal to an empty sendTo pattern. @SendTo is only used if the inbound message does not have a replyToAddress property.

Starting with version 1.5, the @SendTo value can be a bean initialization SpEL Expression, for example…

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
    return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
    return "test.sendTo.reply.spel";
}

The expression must evaluate to a String, which