On this page
38. RabbitMQ Binder
38.1 Usage
For using the RabbitMQ binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
Alternatively, you can also use the Spring Cloud Stream RabbitMQ Starter.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
38.2 RabbitMQ Binder Overview
A simplified diagram of how the RabbitMQ binder operates can be seen below.
Figure 38.1. RabbitMQ Binder
The RabbitMQ Binder implementation maps each destination to a TopicExchange
. For each consumer group, a Queue
will be bound to that TopicExchange
. Each consumer instance have a corresponding RabbitMQ Consumer
instance for its group’s Queue
. For partitioned producers/consumers the queues are suffixed with the partition index and use the partition index as routing key.
Using the autoBindDlq
option, you can optionally configure the binder to create and configure dead-letter queues (DLQs) (and a dead-letter exchange DLX
). The dead letter queue has the name of the destination, appended with .dlq
. If retry is enabled ( maxAttempts > 1
) failed messages will be delivered to the DLQ. If retry is disabled ( maxAttempts = 1
), you should set requeueRejected
to false
(default) so that a failed message will be routed to the DLQ, instead of being requeued. In addition, republishToDlq
causes the binder to publish a failed message to the DLQ (instead of rejecting it); this enables additional information to be added to the message in headers, such as the stack trace in the x-exception-stacktrace
header. This option does not need retry enabled; you can republish a failed message after just one attempt. Starting with version 1.2, you can configure the delivery mode of republished messages; see property republishDeliveryMode
.
Setting
requeueRejected
totrue
will cause the message to be requeued and redelivered continually, which is likely not what you want unless the failure issue is transient. In general, it’s better to enable retry within the binder by settingmaxAttempts
to greater than one, or setrepublishToDlq
totrue
.
See Section 38.3.1, “RabbitMQ Binder Properties” for more information about these properties.
The framework does not provide any standard mechanism to consume dead-letter messages (or to re-route them back to the primary queue). Some options are described in Section 38.6, “Dead-Letter Queue Processing”.
When multiple RabbitMQ binders are used in a Spring Cloud Stream application, it is important to disable 'RabbitAutoConfiguration' to avoid the same configuration from
RabbitAutoConfiguration
being applied to the two binders.
Starting with version 1.3, the RabbitMessageChannelBinder
creates an internal ConnectionFactory
copy for the non-transactional producers to avoid dead locks on consumers when shared, cached connections are blocked because of Memory Alarm on Broker.
38.3 Configuration Options
This section contains settings specific to the RabbitMQ Binder and bound channels.
For general binding configuration options and properties, please refer to the Spring Cloud Stream core documentation .
38.3.1 RabbitMQ Binder Properties
By default, the RabbitMQ binder uses Spring Boot’s ConnectionFactory
, and it therefore supports all Spring Boot configuration options for RabbitMQ. (For reference, consult the Spring Boot documentation .) RabbitMQ configuration options use the spring.rabbitmq
prefix.
In addition to Spring Boot options, the RabbitMQ binder supports the following properties:
spring.cloud.stream.rabbit.binder.adminAddresses
- A comma-separated list of RabbitMQ management plugin URLs. Only used when
nodes
contains more than one entry. Each entry in this list must have a corresponding entry inspring.rabbitmq.addresses
. Only needed if you are using a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.
- A comma-separated list of RabbitMQ management plugin URLs. Only used when
Default: empty.
spring.cloud.stream.rabbit.binder.nodes
- A comma-separated list of RabbitMQ node names. When more than one entry, used to locate the server address where a queue is located. Each entry in this list must have a corresponding entry in
spring.rabbitmq.addresses
. Only needed if you are using a RabbitMQ cluster and wish to consume from the node that hosts the queue. See Queue Affinity and the LocalizedQueueConnectionFactory for more information.
- A comma-separated list of RabbitMQ node names. When more than one entry, used to locate the server address where a queue is located. Each entry in this list must have a corresponding entry in
Default: empty.
spring.cloud.stream.rabbit.binder.compressionLevel
- Compression level for compressed bindings. See
java.util.zip.Deflater
.
- Compression level for compressed bindings. See
Default: 1
(BEST_LEVEL).
38.3.2 RabbitMQ Consumer Properties
The following properties are available for Rabbit consumers only and must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
.
acknowledgeMode
- The acknowledge mode.
Default: AUTO
.
autoBindDlq
- Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
.
bindingRoutingKey
- The routing key with which to bind the queue to the exchange (if
bindQueue
istrue
). for partitioned destinations-<instanceIndex>
will be appended.
- The routing key with which to bind the queue to the exchange (if
Default: #
.
bindQueue
- Whether to bind the queue to the destination exchange; set to
false
if you have set up your own infrastructure and have previously created/bound the queue.
- Whether to bind the queue to the destination exchange; set to
Default: true
.
deadLetterQueueName
- name of the DLQ
Default: prefix+destination.dlq
deadLetterExchange
- a DLX to assign to the queue; if autoBindDlq is true
Default: 'prefix+DLX'
deadLetterRoutingKey
- a dead letter routing key to assign to the queue; if autoBindDlq is true
Default: destination
declareExchange
- Whether to declare the exchange for the destination.
Default: true
.
delayedExchange
- Whether to declare the exchange as a
Delayed Message Exchange
- requires the delayed message exchange plugin on the broker. Thex-delayed-type
argument is set to theexchangeType
.
- Whether to declare the exchange as a
Default: false
.
dlqDeadLetterExchange
- if a DLQ is declared, a DLX to assign to that queue
Default: none
dlqDeadLetterRoutingKey
- if a DLQ is declared, a dead letter routing key to assign to that queue; default none
Default: none
dlqExpires
- how long before an unused dead letter queue is deleted (ms)
Default: no expiration
dlqLazy
- Declare the dead letter queue with the
x-queue-mode=lazy
argument. See Lazy Queues . Consider using a policy instead of this setting because using a policy allows changing the setting without deleting the queue.
- Declare the dead letter queue with the
Default: false
.
dlqMaxLength
- maximum number of messages in the dead letter queue
Default: no limit
dlqMaxLengthBytes
- maximum number of total bytes in the dead letter queue from all messages
Default: no limit
dlqMaxPriority
- maximum priority of messages in the dead letter queue (0-255)
Default: none
dlqTtl
- default time to live to apply to the dead letter queue when declared (ms)
Default: no limit
durableSubscription
- Whether subscription should be durable. Only effective if
group
is also set.
- Whether subscription should be durable. Only effective if
Default: true
.
exchangeAutoDelete
- If
declareExchange
is true, whether the exchange should be auto-delete (removed after the last queue is removed).
- If
Default: true
.
exchangeDurable
- If
declareExchange
is true, whether the exchange should be durable (survives broker restart).
- If
Default: true
.
exchangeType
- The exchange type;
direct
,fanout
ortopic
for non-partitioned destinations;direct
ortopic
for partitioned destinations.
- The exchange type;
Default: topic
.
exclusive
- Create an exclusive consumer; concurrency should be 1 when this is
true
; often used when strict ordering is required but enabling a hot standby instance to take over after a failure. SeerecoveryInterval
, which controls how often a standby instance will attempt to consume.
- Create an exclusive consumer; concurrency should be 1 when this is
Default: false
.
expires
- how long before an unused queue is deleted (ms)
Default: no expiration
failedDeclarationRetryInterval
- The interval (ms) between attempts to consume from a queue if it is missing.
Default: 5000
headerPatterns
- Patterns for headers to be mapped from inbound messages.
Default: ['*']
(all headers).
lazy
- Declare the queue with the
x-queue-mode=lazy
argument. See Lazy Queues . Consider using a policy instead of this setting because using a policy allows changing the setting without deleting the queue.
- Declare the queue with the
Default: false
.
maxConcurrency
- the maximum number of consumers
Default: 1
.
maxLength
- maximum number of messages in the queue
Default: no limit
maxLengthBytes
- maximum number of total bytes in the queue from all messages
Default: no limit
maxPriority
- maximum priority of messages in the queue (0-255)
Default
none
missingQueuesFatal
- If the queue cannot be found, treat the condition as fatal and stop the listener container. Defaults to
false
so that the container keeps trying to consume from the queue, for example when using a cluster and the node hosting a non HA queue is down.
- If the queue cannot be found, treat the condition as fatal and stop the listener container. Defaults to
Default
false
prefetch
- Prefetch count.
Default: 1
.
prefix
- A prefix to be added to the name of the
destination
and queues.
- A prefix to be added to the name of the
Default: "".
queueDeclarationRetries
- The number of times to retry consuming from a queue if it is missing. Only relevant if
missingQueuesFatal
istrue
; otherwise the container keeps retrying indefinitely.
- The number of times to retry consuming from a queue if it is missing. Only relevant if
Default
3
queueNameGroupOnly
- When true, consume from a queue with a name equal to the
group
; otherwise the queue name isdestination.group
. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue.
- When true, consume from a queue with a name equal to the
Default: false.
recoveryInterval
- The interval between connection recovery attempts, in milliseconds.
Default: 5000
.
requeueRejected
- Whether delivery failures should be requeued when retry is disabled or republishToDlq is false.
Default: false
.
republishDeliveryMode
- When
republishToDlq
istrue
, specify the delivery mode of the republished message.
- When
Default: DeliveryMode.PERSISTENT
republishToDlq
- By default, messages which fail after retries are exhausted are rejected. If a dead-letter queue (DLQ) is configured, RabbitMQ will route the failed message (unchanged) to the DLQ. If set to
true
, the binder will republish failed messages to the DLQ with additional headers, including the exception message and stack trace from the cause of the final failure.
- By default, messages which fail after retries are exhausted are rejected. If a dead-letter queue (DLQ) is configured, RabbitMQ will route the failed message (unchanged) to the DLQ. If set to
Default: false
transacted
- Whether to use transacted channels.
Default: false
.
ttl
- default time to live to apply to the queue when declared (ms)
Default: no limit
txSize
- The number of deliveries between acks.
Default: 1
.
38.3.3 Rabbit Producer Properties
The following properties are available for Rabbit producers only and must be prefixed with spring.cloud.stream.rabbit.bindings.<channelName>.producer.
.
autoBindDlq
- Whether to automatically declare the DLQ and bind it to the binder DLX.
Default: false
.
batchingEnabled
- Whether to enable message batching by producers.
Default: false
.
batchSize
- The number of messages to buffer when batching is enabled.
Default: 100
.
batchBufferLimit
- Default:
10000
.
- Default:
batchTimeout
- Default:
5000
.
- Default:
bindingRoutingKey
- The routing key with which to bind the queue to the exchange (if
bindQueue
istrue
). Only applies to non-partitioned destinations. Only applies ifrequiredGroups
are provided and then only to those groups.
- The routing key with which to bind the queue to the exchange (if
Default: #
.
bindQueue
- Whether to bind the queue to the destination exchange; set to
false
if you have set up your own infrastructure and have previously created/bound the queue. Only applies ifrequiredGroups
are provided and then only to those groups.
- Whether to bind the queue to the destination exchange; set to
Default: true
.
compress
- Whether data should be compressed when sent.
Default: false
.
deadLetterQueueName
- name of the DLQ Only applies if
requiredGroups
are provided and then only to those groups.
- name of the DLQ Only applies if
Default: prefix+destination.dlq
deadLetterExchange
- a DLX to assign to the queue; if autoBindDlq is true Only applies if
requiredGroups
are provided and then only to those groups.
- a DLX to assign to the queue; if autoBindDlq is true Only applies if
Default: 'prefix+DLX'
deadLetterRoutingKey
- a dead letter routing key to assign to the queue; if autoBindDlq is true Only applies if
requiredGroups
are provided and then only to those groups.
- a dead letter routing key to assign to the queue; if autoBindDlq is true Only applies if
Default: destination
declareExchange
- Whether to declare the exchange for the destination.
Default: true
.
delay
- A SpEL expression to evaluate the delay to apply to the message (
x-delay
header) - has no effect if the exchange is not a delayed message exchange.
- A SpEL expression to evaluate the delay to apply to the message (
Default: No x-delay
header is set.
delayedExchange
- Whether to declare the exchange as a
Delayed Message Exchange
- requires the delayed message exchange plugin on the broker. Thex-delayed-type
argument is set to theexchangeType
.
- Whether to declare the exchange as a
Default: false
.
deliveryMode
- Delivery mode.
Default: PERSISTENT
.
dlqDeadLetterExchange
- if a DLQ is declared, a DLX to assign to that queue Only applies if
requiredGroups
are provided and then only to those groups.
- if a DLQ is declared, a DLX to assign to that queue Only applies if
Default: none
dlqDeadLetterRoutingKey
- if a DLQ is declared, a dead letter routing key to assign to that queue; default none Only applies if
requiredGroups
are provided and then only to those groups.
- if a DLQ is declared, a dead letter routing key to assign to that queue; default none Only applies if
Default: none
dlqExpires
- how long before an unused dead letter queue is deleted (ms) Only applies if
requiredGroups
are provided and then only to those groups.
- how long before an unused dead letter queue is deleted (ms) Only applies if
Default: no expiration
dlqLazy
- Declare the dead letter queue with the
x-queue-mode=lazy
argument. See Lazy Queues . Consider using a policy instead of this setting because using a policy allows changing the setting without deleting the queue. Only applies ifrequiredGroups
are provided and then only to those groups.
- Declare the dead letter queue with the
dlqMaxLength
- maximum number of messages in the dead letter queue Only applies if
requiredGroups
are provided and then only to those groups.
- maximum number of messages in the dead letter queue Only applies if
Default: no limit
dlqMaxLengthBytes
- maximum number of total bytes in the dead letter queue from all messages Only applies if
requiredGroups
are provided and then only to those groups.
- maximum number of total bytes in the dead letter queue from all messages Only applies if
Default: no limit
dlqMaxPriority
- maximum priority of messages in the dead letter queue (0-255) Only applies if
requiredGroups
are provided and then only to those groups.
- maximum priority of messages in the dead letter queue (0-255) Only applies if
Default: none
dlqTtl
- default time to live to apply to the dead letter queue when declared (ms) Only applies if
requiredGroups
are provided and then only to those groups.
- default time to live to apply to the dead letter queue when declared (ms) Only applies if
Default: no limit
exchangeAutoDelete
- If
declareExchange
is true, whether the exchange should be auto-delete (removed after the last queue is removed).
- If
Default: true
.
exchangeDurable
- If
declareExchange
is true, whether the exchange should be durable (survives broker restart).
- If
Default: true
.
exchangeType
- The exchange type;
direct
,fanout
ortopic
for non-partitioned destinations;direct
ortopic
for partitioned destinations.
- The exchange type;
Default: topic
.
expires
- how long before an unused queue is deleted (ms) Only applies if
requiredGroups
are provided and then only to those groups.
- how long before an unused queue is deleted (ms) Only applies if
Default: no expiration
headerPatterns
- Patterns for headers to be mapped to outbound messages.
Default: ['*']
(all headers).
lazy
- Declare the queue with the
x-queue-mode=lazy
argument. See Lazy Queues . Consider using a policy instead of this setting because using a policy allows changing the setting without deleting the queue. Only applies ifrequiredGroups
are provided and then only to those groups.
- Declare the queue with the
Default: false
.
maxLength
- maximum number of messages in the queue Only applies if
requiredGroups
are provided and then only to those groups.
- maximum number of messages in the queue Only applies if
Default: no limit
maxLengthBytes
- maximum number of total bytes in the queue from all messages Only applies if
requiredGroups
are provided and then only to those groups.
- maximum number of total bytes in the queue from all messages Only applies if
Default: no limit
maxPriority
- maximum priority of messages in the queue (0-255) Only applies if
requiredGroups
are provided and then only to those groups.
- maximum priority of messages in the queue (0-255) Only applies if
Default
none
prefix
- A prefix to be added to the name of the
destination
exchange.
- A prefix to be added to the name of the
Default: "".
queueNameGroupOnly
- When true, consume from a queue with a name equal to the
group
; otherwise the queue name isdestination.group
. This is useful, for example, when using Spring Cloud Stream to consume from an existing RabbitMQ queue. Only applies ifrequiredGroups
are provided and then only to those groups.
- When true, consume from a queue with a name equal to the
Default: false.
routingKeyExpression
- A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, e.g.
routingKeyExpression='my.routingKey'
in a properties file, orroutingKeyExpression: '''my.routingKey'''
in a YAML file.
- A SpEL expression to determine the routing key to use when publishing messages. For a fixed routing key, use a literal expression, e.g.
Default: destination
or destination-<partition>
for partitioned destinations.
transacted
- Whether to use transacted channels.
Default: false
.
ttl
- default time to live to apply to the queue when declared (ms) Only applies if
requiredGroups
are provided and then only to those groups.
- default time to live to apply to the queue when declared (ms) Only applies if
Default: no limit
In the case of RabbitMQ, content type headers can be set by external applications. Spring Cloud Stream supports them as part of an extended internal protocol used for any type of transport (including transports, such as Kafka, that do not normally support headers).
38.4 Retry With the RabbitMQ Binder
38.4.1 Overview
When retry is enabled within the binder, the listener container thread is suspended for any back off periods that are configured. This might be important when strict ordering is required with a single consumer but for other use cases it prevents other messages from being processed on that thread. An alternative to using binder retry is to set up dead lettering with time to live on the dead-letter queue (DLQ), as well as dead-letter configuration on the DLQ itself. See Section 38.3.1, “RabbitMQ Binder Properties” for more information about the properties discussed here. Example configuration to enable this feature:
Set
autoBindDlq
totrue
- the binder will create a DLQ; you can optionally specify a name indeadLetterQueueName
Set
dlqTtl
to the back off time you want to wait between redeliveriesSet the
dlqDeadLetterExchange
to the default exchange - expired messages from the DLQ will be routed to the original queue since the defaultdeadLetterRoutingKey
is the queue name (destination.group
)
To force a message to be dead-lettered, either throw an AmqpRejectAndDontRequeueException
, or set requeueRejected
to true
and throw any exception.
The loop will continue without end, which is fine for transient problems but you may want to give up after some number of attempts. Fortunately, RabbitMQ provides the x-death
header which allows you to determine how many cycles have occurred.
To acknowledge a message after giving up, throw an ImmediateAcknowledgeAmqpException
.
38.4.2 Putting it All Together
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
This configuration creates an exchange myDestination
with queue myDestination.consumerGroup
bound to a topic exchange with a wildcard routing key #
. It creates a DLQ bound to a direct exchange DLX
with routing key myDestination.consumerGroup
. When messages are rejected, they are routed to the DLQ. After 5 seconds, the message expires and is routed to the original queue using the queue name as the routing key.
Spring Boot application.
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
Notice that the count property in the x-death
header is a Long
.
38.5 Error Channels
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination, and can be configured to send async producer send failures to an error channel too. See the section called “Message Channel Binders and Error Channels” for more information.
With rabbitmq, there are two types of send failures:
returned messages
negatively acknowledged Publisher Confirms
The latter is rare; quoting the RabbitMQ documentation "[A nack] will only be delivered if an internal error occurs in the Erlang process responsible for a queue.".
As well as enabling producer error channels as described in the section called “Message Channel Binders and Error Channels”, the RabbitMQ binder will only send messages to the channels if the connection factory is appropriately configured:
ccf.setPublisherConfirms(true);
ccf.setPublisherReturns(true);
When using spring boot configuration for the connection factory, set properties:
spring.rabbitmq.publisher-confirms
spring.rabbitmq.publisher-returns
The payload of the ErrorMessage
for a returned message is a ReturnedAmqpMessageException
with properties:
failedMessage
- the spring-messagingMessage<?>
that failed to be sent.amqpMessage
- the raw spring-amqpMessage
replyCode
- an integer value indicating the reason for the failure (e.g. 312 - No route)replyText
- a text value indicating the reason for the failure e.g.NO_ROUTE
.exchange
- the exchange to which the message was published.routingKey
- the routing key used when the message was published.
For negatively acknowledged confirms, the payload is a NackedAmqpMessageException
with properties:
failedMessage
- the spring-messagingMessage<?>
that failed to be sent.nackReason
- a reason (if available; you may need to examine the broker logs for more information).
There is no automatic handling of these exceptions (such as sending to a Dead-Letter queue); you can consume these exceptions with your own Spring Integration flow.
38.6 Dead-Letter Queue Processing
Because it can’t be anticipated how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original queue. However, if the problem is a permanent issue, that could cause an infinite loop. The following spring-boot
application is an example of how to route those messages back to the original queue, but moves them to a third "parking lot" queue after three attempts. The second example utilizes the RabbitMQ Delayed Message Exchange to introduce a delay to the requeued message. In this example, the delay increases for each attempt. These examples use a @RabbitListener
to receive messages from the DLQ, you could also use RabbitTemplate.receive()
in a batch process.
The examples assume the original destination is so8400in
and the consumer group is so8400
.
38.6.1 Non-Partitioned Destinations
The first two examples are when the destination is not partitioned.
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
38.6.2 Partitioned Destinations
With partitioned destinations, there is one DLQ for all partitions and we determine the original queue from the headers.
republishToDlq=false
When republishToDlq
is false
, RabbitMQ publishes the message to the DLX/DLQ with an x-death
header containing information about the original destination.
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
republishToDlq=true
When republishToDlq
is true
, the republishing recoverer adds the original exchange and routing key to headers.
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}