38. RabbitMQ Binder

38.1 用法

要使用 RabbitMQ binder,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream RabbitMQ Starter。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

38.2 RabbitMQ Binder 概述

下面是 RabbitMQ binder 如何操作的简化图。

图 1_.RabbitMQ Binder

rabbit binder

RabbitMQ Binder implementation maps 将每个目标映射到TopicExchange。对于每个 consumer group,Queue将绑定到TopicExchange。每个 consumer 实例的 group 的Queue都有一个对应的 RabbitMQ Consumer实例。对于分区 producers/consumers,队列以分区索引为后缀,并使用分区索引作为路由 key。

使用autoBindDlq选项,您可以选择配置 binder 来创建和配置 dead-letter 队列(DLQ)(以及 dead-letter 交换DLX)。死信队列具有目标的 name,附加.dlq。如果启用了重试(maxAttempts > 1),则失败的消息将被传递到 DLQ。如果禁用重试(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。另外,republishToDlq会导致 binder 将失败的消息发布到 DLQ(而不是拒绝它);这样可以在_header 中的消息中添加其他信息,例如x-exception-stacktrace标头中的堆栈跟踪。此选项不需要重试启用;只需一次尝试即可重新发布失败的消息。从 version 1.2 开始,您可以配置重新发布的消息的传递模式;见 property republishDeliveryMode

requeueRejected设置为true将导致消息被重新排队并连续重新传送,除非故障问题是暂时的,否则这可能不是您想要的。通常,最好通过将maxAttempts设置为大于 1 来启用 binder 中的重试,或将republishToDlq设置为true

有关这些 properties 的更多信息,请参见第 38.3.1 节,“RabbitMQ Binder Properties”

framework 没有提供消耗 dead-letter 消息的任何标准机制(或者 re-route 消息返回主队列)。一些选项在第 38.6 节,“Dead-Letter 队列处理”中描述。

当在 Spring Cloud Stream application 中使用多个 RabbitMQ binders 时,禁用'RabbitAutoConfiguration'以避免将RabbitAutoConfiguration应用于两个 binders 的相同配置非常重要。

从 version 1.3 开始,RabbitMessageChannelBinder为 non-transactional 生成器创建一个内部ConnectionFactory副本,以避免在共享时消费者死锁,由于 Broker 上的Memory 报警而阻塞了高速缓存连接。

38.3 Configuration 选项

此部分包含特定于 RabbitMQ Binder 和绑定 channels 的设置。

有关常规 binding configuration 选项和 properties,请参阅Spring Cloud Stream 核心文档

38.3.1 RabbitMQ Binder Properties

默认情况下,RabbitMQ binder 使用 Spring Boot 的ConnectionFactory,因此它支持 RabbitMQ 的所有 Spring Boot configuration 选项。 (对于 reference,请参阅Spring Boot 文档 .) RabbitMQ configuration 选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ binder 还支持以下 properties:

  • spring.cloud.stream.rabbit.binder.adminAddresses

    • 一个 comma-separated RabbitMQ management 插件 URL 列表。仅在nodes包含多个条目时使用。此列表中的每个条目都必须在spring.rabbitmq.addresses中具有相应的条目。 **仅在您使用 RabbitMQ cluster 并希望从承载队列的节点使用时才需要.**有关详细信息,请参阅 Queue Affinity 和 LocalizedQueueConnectionFactory.

默认值:空。

  • spring.cloud.stream.rabbit.binder.nodes

    • RabbitMQ 节点名称的 comma-separated 列表。当多个条目用于查找队列所在的服务器地址时。此列表中的每个条目都必须在spring.rabbitmq.addresses中具有相应的条目。 **仅在您使用 RabbitMQ cluster 并希望从承载队列的节点使用时才需要.**有关详细信息,请参阅 Queue Affinity 和 LocalizedQueueConnectionFactory.

默认值:空。

  • spring.cloud.stream.rabbit.binder.compressionLevel

    • 压缩绑定的压缩 level。见java.util.zip.Deflater

默认值:1(BESTLEVEL)。

38.3.2 RabbitMQ Consumer Properties

以下 properties 仅适用于 Rabbit 使用者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.为前缀。

  • acknowledgeMode

    • 确认模式。

默认值:AUTO

  • autoBindDlq

    • 是否自动声明 DLQ 并将其绑定到 binder DLX。

默认值:false

  • bindingRoutingKey

    • 用于将队列绑定到交换的路由 key(如果bindQueuetrue)。对于分区目的地-<instanceIndex>将被追加。

默认值:#

  • bindQueue

    • 是否将队列绑定到目标交换;如果您已设置自己的基础架构并且之前已经 created/bound 队列,则设置为false

默认值:true

  • deadLetterQueueName

    • DLQ 的 name

默认值:prefix+destination.dlq

  • deadLetterExchange

    • 要分配给队列的 DLX;如果 autoBindDlq 是 true

默认值:'前缀 DLX'

  • deadLetterRoutingKey

    • 一个死信路由 key 分配给队列;如果 autoBindDlq 是 true

默认值:destination

  • declareExchange

    • 是否申报目的地的交换。

默认值:true

  • delayedExchange

    • 是否将交换声明为Delayed Message Exchange - 需要 broker 上的延迟消息交换插件。 x-delayed-type参数设置为exchangeType

默认值:false

  • dlqDeadLetterExchange

    • 如果声明了 DLQ,则分配给该队列的 DLX

默认值:none

  • dlqDeadLetterRoutingKey

    • 如果声明了 DLQ,则将死信路由 key 分配给该队列;默认 none

默认值:none

  • dlqExpires

    • 删除未使用的死信队列前的__(_)

默认值:no expiration

  • dlqLazy

    • 使用x-queue-mode=lazy参数声明死信队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。

默认值:false

  • dlqMaxLength

    • 死信队列中的最大消息数

默认值:no limit

  • dlqMaxLengthBytes

    • 所有消息中死信队列中的最大总字节数

默认值:no limit

  • dlqMaxPriority

    • 死信队列中的消息的最大优先级(0-255)

默认值:none

  • dlqTtl

    • 声明时默认 time 生效以应用于死信队列(ms)

默认值:no limit

  • durableSubscription

    • 订阅是否应该持久。仅在设置group时有效。

默认值:true

  • exchangeAutoDelete

    • 如果declareExchange是 true,则交换是否应为 auto-delete(在删除最后一个队列后删除)。

默认值:true

  • exchangeDurable

    • 如果declareExchange是 true,那么交换是否应该是持久的(幸存 broker 重启)。

默认值:true

  • exchangeType

    • 交换类型;对于 non-partitioned 目的地directfanouttopic; directtopic用于分区目的地。

默认值:topic

  • 独家

    • 创建一个独有的 consumer;当这是true时,并发应该是 1;通常在需要严格的 ordering 时使用,但在发生故障后启用热备用实例。请参阅recoveryInterval,它控制备用实例尝试使用的频率。

默认值:false

  • 到期

    • 在删除未使用的队列之前如何 long(ms)

默认值:no expiration

  • failedDeclarationRetryInterval

    • 队列消耗尝试之间的间隔(ms)(如果缺少)。

默认值:5000

  • headerPatterns

    • 要从入站消息映射 headers 的模式。

默认值:['*'](所有 headers)。

    • 使用x-queue-mode=lazy参数声明队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。

默认值:false

  • maxConcurrency

    • 最大消费者数量

默认值:1

  • 最长长度

    • 队列中的最大消息数

默认值:no limit

  • maxLengthBytes

    • 所有消息中队列中的最大总字节数

默认值:no limit

  • maxPriority

    • 队列中消息的最大优先级(0-255)
  • 默认

    • none
  • missingQueuesFatal

    • 如果找不到队列,请将条件视为致命,并停止 listener 容器。默认为false,以便容器继续尝试从队列中使用,例如,当使用 cluster 并且托管非 HA 队列的节点关闭时。
  • 默认

    • false
  • 预取

    • 预取计数。

默认值:1

  • 字首

    • 要添加到destination和队列的 name 的前缀。

默认值:“”。

  • queueDeclarationRetries

    • 如果丢失,则从队列中重试 consuming 的次数。仅在missingQueuesFataltrue时才相关;否则容器会无限期地重试。
  • 默认

    • 3
  • queueNameGroupOnly

    • 当 true 时,从 name 等于group的队列中消耗;否则队列 name 是destination.group。这很有用,例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时。

默认值:false。

  • recoveryInterval

    • 连接恢复尝试之间的间隔,以毫秒为单位。

默认值:5000

  • requeueRejected

    • 在禁用重试或 republishToDlq 为 false 时是否应重新排队传递失败。

默认值:false

  • republishDeliveryMode

    • republishToDlqtrue时,指定重新发布的消息的传递模式。

默认值:DeliveryMode.PERSISTENT

  • republishToDlq

    • 默认情况下,拒绝重试后失败的消息。如果配置了 dead-letter 队列(DLQ),RabbitMQ 将把失败的消息(未更改)路由到 DLQ。如果设置为true,binder 将使用其他 headers 将失败的消息重新发布到 DLQ,包括 exception 消息和最终失败原因的堆栈跟踪。

默认值:false

  • 交易

    • 是否使用事务处理 channels。

默认值:false

  • TTL

    • 声明时默认 time 生效以应用于队列(ms)

默认值:no limit

  • txSize

    • ack 之间的交付数量。

默认值:1

38.3.3 Rabbit Producer Properties

以下 properties 仅适用于 Rabbit 生成器,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.为前缀。

  • autoBindDlq

    • 是否自动声明 DLQ 并将其绑定到 binder DLX。

默认值:false

  • batchingEnabled

    • 是否启用生产者的消息批处理。

默认值:false

  • BATCHSIZE

    • 启用批处理时要缓冲的消息数。

默认值:100

  • batchBufferLimit

    • 默认值:10000
  • batchTimeout

    • 默认值:5000
  • bindingRoutingKey

    • 用于将队列绑定到交换的路由 key(如果bindQueuetrue)。仅适用于 non-partitioned 目的地。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:#

  • bindQueue

    • 是否将队列绑定到目标交换;如果您已设置自己的基础架构并且之前已 created/bound 队列,则设置为false。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:true

  • 压缩

    • 是否应在发送时压缩数据。

默认值:false

  • deadLetterQueueName

    • DLQ 的 name 仅在提供requiredGroups时适用,然后仅适用于这些组。

默认值:prefix+destination.dlq

  • deadLetterExchange

    • 要分配给队列的 DLX;如果 autoBindDlq 是 true 仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:'前缀 DLX'

  • deadLetterRoutingKey

    • 一个死信路由 key 分配给队列;如果 autoBindDlq 是 true 仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:destination

  • declareExchange

    • 是否申报目的地的交换。

默认值:true

  • 延迟

    • 用于评估应用于消息的延迟(x-delay标头)的 SpEL 表达式 - 如果交换不是延迟消息交换,则无效。

默认值:未设置x-delay标头。

  • delayedExchange

    • 是否将交换声明为Delayed Message Exchange - 需要 broker 上的延迟消息交换插件。 x-delayed-type参数设置为exchangeType

默认值:false

  • deliveryMode

    • 交货方式。

默认值:PERSISTENT

  • dlqDeadLetterExchange

    • 如果声明了 DLQ,则分配给该队列的 DLX 仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:none

  • dlqDeadLetterRoutingKey

    • 如果声明了 DLQ,则将死信路由 key 分配给该队列; default none 仅在提供requiredGroups时适用,然后仅适用于这些组。

默认值:none

  • dlqExpires

    • 如何删除未使用的死信队列前的__(ms)仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:no expiration

  • dlqLazy

    • 使用x-queue-mode=lazy参数声明死信队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。仅在提供requiredGroups时才适用,然后仅适用于这些组。
  • dlqMaxLength

    • 死信队列中的最大消息数仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • dlqMaxLengthBytes

    • 所有消息的死信队列中的最大总字节数仅在提供requiredGroups时适用,然后仅适用于这些组。

默认值:no limit

  • dlqMaxPriority

    • 死信队列中消息的最大优先级(0-255)仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:none

  • dlqTtl

    • 声明时应用于死信队列的默认 time(ms)仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:no limit

  • exchangeAutoDelete

    • 如果declareExchange是 true,则交换是否应为 auto-delete(在删除最后一个队列后删除)。

默认值:true

  • exchangeDurable

    • 如果declareExchange是 true,那么交换是否应该是持久的(幸存 broker 重启)。

默认值:true

  • exchangeType

    • 交换类型;对于 non-partitioned 目的地directfanouttopic; directtopic用于分区目的地。

默认值:topic

  • 到期

    • 如何删除未使用的队列之前的__(ms)仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:no expiration

  • headerPatterns

    • headers 的模式映射到出站消息。

默认值:['*'](所有 headers)。

    • 使用x-queue-mode=lazy参数声明队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:false

  • 最长长度

    • 队列中的最大消息数仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • maxLengthBytes

    • 所有消息中队列中的最大总字节数仅在提供requiredGroups时适用,然后仅适用于这些组。

默认值:no limit

  • maxPriority

    • 队列中消息的最大优先级(0-255)仅在提供requiredGroups时才适用,然后仅适用于这些组。
  • 默认

    • none
  • 字首

    • 要添加到destination交换的 name 的前缀。

默认值:“”。

  • queueNameGroupOnly

    • 当 true 时,从 name 等于group的队列中消耗;否则队列 name 是destination.group。这很有用,例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:false。

  • routingKeyExpression

    • 一个 SpEL 表达式,用于确定发布消息时要使用的路由 key。对于固定路由 key,请使用文字表达式 e.g. _pro在 properties 文件中,或routingKeyExpression: '''my.routingKey'''在 YAML 文件中。

默认值:destinationdestination-<partition>表示分区目的地。

  • 交易

    • 是否使用事务处理 channels。

默认值:false

  • TTL

    • 声明时应用于队列的默认 time(ms)仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:no limit

在 RabbitMQ 的情况下,content type headers 可以由外部 applications 设置。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,用于任何类型的传输(包括传输,例如 Kafka,通常不支持 headers)。

38.4 使用 RabbitMQ Binder 重试

38.4.1 概述

在 binder 中启用重试时,listener 容器线程将暂停以配置任何后退时段。当单个 consumer 需要严格的 ordering 时,这可能很重要,但对于其他用例,它可以防止在该线程上处理其他消息。使用 binder 重试的另一种方法是使用 time 设置死字法以生活在 dead-letter 队列(DLQ)上,以及 DLQ 本身上的 dead-letter configuration。有关此处讨论的 properties 的更多信息,请参见第 38.3.1 节,“RabbitMQ Binder Properties”。 Example configuration 以启用此 feature:

  • autoBindDlq设置为true - binder 将创建 DLQ;您可以选择在deadLetterQueueName中指定 name

  • dlqTtl设置为您想要在重新传送之间等待的__ back

  • dlqDeadLetterExchange设置为默认交换 - 来自 DLQ 的过期消息将路由到原始队列,因为默认deadLetterRoutingKey是队列 name(destination.group)

要强制消息为 dead-lettered,要么抛出AmqpRejectAndDontRequeueException,要么将requeueRejected设置为true并抛出任何 exception。

循环将继续没有结束,这对于瞬态问题很好,但是你可能想在经过一些尝试后放弃。幸运的是,RabbitMQ 提供x-death标头,允许您确定发生了多少个循环。

要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException

38.4.2 全部放在一起

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此 configuration 创建一个 exchange myDestination,其队列myDestination.consumerGroup绑定到带有通配符路由 key #的 topic 交换。它创建一个 DLQ 绑定到直接交换DLX与路由 key myDestination.consumerGroup。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息到期并使用 queue name 作为路由 key 路由到原始队列。

Spring Boot application.

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-death标头中的 count property 是Long

38.5 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见名为“Message Channel Binder and Error Channels”的部分

使用 rabbitmq,有两种类型的发送失败:

后者很少见;引用 RabbitMQ 文档“[228]只有在负责队列的 Erlang process 中发生内部错误时才会被传递。”

除了中所述启用 producer 错误 channels 之外,如果连接工厂配置正确,RabbitMQ binder 将仅向 channels 发送消息:

  • ccf.setPublisherConfirms(true);

  • ccf.setPublisherReturns(true);

将 spring boot configuration 用于连接工厂时,请设置 properties:

  • spring.rabbitmq.publisher-confirms

  • spring.rabbitmq.publisher-returns

返回消息的ErrorMessage的有效负载是带有 properties 的ReturnedAmqpMessageException

  • failedMessage - 无法发送的 spring-messaging Message<?>

  • amqpMessage - 原始 spring-amqp Message

  • replyCode - integer value 指示失败的原因(e.g. 312 - 否 route)

  • replyText - 表示失败原因 e.g 的文本 value。 NO_ROUTE

  • exchange - 消息发布的交换。

  • routingKey - 发布消息时使用的路由 key。

对于否定确认的确认,有效负载是带有 properties 的NackedAmqpMessageException

  • failedMessage - 无法发送的 spring-messaging Message<?>

  • nackReason - 一个原因(如果可用;您可能需要检查 broker 日志以获取更多信息)。

没有自动处理这些 exceptions(例如发送到Dead-Letter 队列);您可以使用自己的 Spring Integration 流来使用这些 exceptions。

38.6 Dead-Letter 队列处理

由于无法预测用户如何处理 dead-lettered 消息,framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始队列。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下spring-boot application 是如何将这些消息重新路由回原始队列的示例,但在三次尝试后将它们移动到第三个“停车场”队列。第二个 example 使用RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener从 DLQ 接收消息,您也可以在批处理 process 中使用RabbitTemplate.receive()

这些示例假设原始目标是so8400in,consumer group 是so8400

38.6.1 Non-Partitioned 目的地

前两个示例是目标**未分区的情况。

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

38.6.2 分区目的地

对于分区目标,所有分区都有一个 DLQ,我们从 headers 确定原始队列。

republishToDlq=false

republishToDlqfalse时,RabbitMQ 使用包含有关原始目标的信息的x-death标头将消息发布到 DLX/DLQ。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

republishToDlqtrue时,重新发布恢复器会将原始交换和路由 key 添加到 headers。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}