41. RabbitMQ Binder

41.1 Usage

要使用 RabbitMQ 绑定器,可以通过使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

41.2 RabbitMQBinder 概述

以下简化图显示了 RabbitMQ 绑定程序的工作方式:

图 41.1. RabbitMQBinder

rabbit binder

默认情况下,RabbitMQ Binder 实现将每个目标 Map 到TopicExchange。对于每个 Consumer 组,Queue都绑定到该TopicExchange。每个使用者实例都有对应于其组Queue的 RabbitMQ Consumer实例。对于分区的生产者和使用者,队列以分区索引为后缀,并使用分区索引作为路由键。对于匿名使用者(没有group属性的使用者),将使用自动删除队列(具有随机的唯一名称)。

通过使用可选的autoBindDlq选项,您可以配置 Binder 以创建和配置死信队列(DLQ)(以及死信交换机DLX以及路由基础结构)。默认情况下,死信队列具有目标名称,后跟.dlq。如果启用了重试(maxAttempts > 1),则在重试用尽后,失败的消息将传递到 DLQ。如果禁用了重试(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。另外,republishToDlq使绑定程序将失败的消息发布到 DLQ(而不是拒绝它)。通过此功能,可以将其他信息(例如x-exception-stacktrace头中的堆栈跟踪)添加到头中的消息中。此选项不需要启用重试。只需尝试一次,您就可以重新发布失败的消息。从 1.2 版开始,您可以配置重新发布邮件的传递模式。参见republishDeliveryMode property

Tip

requeueRejected设置为true(使用republishToDlq=false)会导致消息被重新排队并连续重新发送,除非失败的原因是短暂的,否则这可能不是您想要的。通常,应通过将maxAttempts设置为大于 1 或将republishToDlq设置为true来在 Binder 中启用重试。

有关这些属性的更多信息,请参见第 41.3.1 节“ RabbitMQBinder 属性”

该框架没有提供任何标准机制来使用死信消息(或将其重新路由回主队列)。 第 41.6 节“死信队列处理”中描述了一些选项。

Note

当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定程序时,重要的是禁用“ RabbitAutoConfiguration”以避免将来自RabbitAutoConfiguration的相同配置应用于两个绑定程序。您可以使用@SpringBootApplicationComments 排除该类。

从 2.0 版开始,RabbitMessageChannelBinderRabbitTemplate.userPublisherConnection属性设置为true,以便非事务生产方避免对使用方造成死锁,如果由于代理上的memory alarm而阻止了缓存的连接,则可能会发生死锁。

Note

当前,仅multiplex使用者(一个使用者正在收听多个队列)仅受消息驱动的使用者支持;被轮询的使用者只能从单个队列中检索消息。

41.3 配置选项

本节包含特定于 RabbitMQBinder 和绑定通道的设置。

有关常规绑定配置选项和属性,请参见Spring Cloud Stream 核心文档

41.3.1 RabbitMQBinder 属性

默认情况下,RabbitMQ 绑定器使用 Spring Boot 的ConnectionFactory。一致地,它支持 RabbitMQ 的所有 Spring Boot 配置选项。 (有关参考,请参见Spring Boot 文档)。 RabbitMQ 配置选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ 绑定器还支持以下属性:

  • spring.cloud.stream.rabbit.binder.adminAddresses

    • 以逗号分隔的 RabbitMQManagement 插件 URL 列表。仅在nodes包含多个条目时使用。此列表中的每个条目都必须在spring.rabbitmq.addresses中具有一个对应的条目。仅当您使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。有关更多信息,请参见队列亲和力和 LocalizedQueueConnectionFactory

Default: empty.

  • spring.cloud.stream.rabbit.binder.nodes

    • 以逗号分隔的 RabbitMQ 节点名称列表。多个条目时,用于定位队列所在的服务器地址。此列表中的每个条目都必须在spring.rabbitmq.addresses中具有一个对应的条目。仅当您使用 RabbitMQ 集群并希望从托管队列的节点使用时才需要。有关更多信息,请参见队列亲和力和 LocalizedQueueConnectionFactory

Default: empty.

  • spring.cloud.stream.rabbit.binder.compressionLevel

    • 压缩绑定的压缩级别。参见java.util.zip.Deflater

默认值:1(BEST_LEVEL)。

  • spring.cloud.stream.binder.connection-name-prefix

    • 连接名称前缀,用于命名此绑定程序创建的连接。名称是此前缀,后跟#n,其中每次打开新连接时n递增。

默认值:无(Spring AMQP 默认值)。

41.3.2 RabbitMQConsumer 属性

以下属性仅适用于 RabbitConsumer,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.作为前缀。

  • acknowledgeMode

    • 确认模式。

默认值:AUTO

  • autoBindDlq

    • 是否自动声明 DLQ 并将其绑定到绑定器 DLX。

默认值:false

  • bindingRoutingKey

    • 用于将队列绑定到交换机的路由密钥(如果bindQueuetrue)。对于分区的目标,附加-<instanceIndex>

默认值:#

  • bindQueue

    • 是否将队列绑定到目标交换机。如果您已经设置了自己的基础结构并且先前已创建并绑定了队列,请将其设置为false

默认值:true

  • consumerTagPrefix

    • 用于创建 Consumer 标签;将在每个创建的使用者之后加上#n,其中n递增。示例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}

默认值:无-broker 将生成随机的 Consumer 标签。

  • deadLetterQueueName

    • DLQ 的名称

默认值:prefix+destination.dlq

  • deadLetterExchange

    • 分配给队列的 DLX。仅当autoBindDlqtrue时才相关。

Default: 'prefix+DLX'

  • deadLetterExchangeType

    • 分配给队列的 DLX 的类型。仅当autoBindDlqtrue时才相关。

Default: 'direct'

  • deadLetterRoutingKey

    • 分配给队列的死信路由键。仅当autoBindDlqtrue时才相关。

默认值:destination

  • declareDlx

    • 是否声明目的地交换死信。仅当autoBindDlqtrue时才相关。如果您有预配置的 DLX,请设置为false

默认值:true

  • declareExchange

    • 是否声明目的地交换。

默认值:true

  • delayedExchange

    • 是否将交换声明为Delayed Message Exchange。在代理上需要延迟的消息交换插件。 x-delayed-type参数设置为exchangeType

默认值:false

  • dlqDeadLetterExchange

    • 如果声明了 DLQ,则分配给该队列的 DLX。

默认值:none

  • dlqDeadLetterRoutingKey

    • 如果声明了 DLQ,则为该队列分配一个死信路由密钥。

默认值:none

  • dlqExpires

    • 删除未使用的死信队列的时间(以毫秒为单位)。

默认值:no expiration

  • dlqLazy

    • x-queue-mode=lazy参数声明死信队列。参见"Lazy Queues"。考虑使用策略而不是此设置,因为使用策略允许更改设置而不删除队列。

默认值:false

  • dlqMaxLength

    • 死信队列中的最大消息数。

默认值:no limit

  • dlqMaxLengthBytes

    • 所有消息中的死信队列中的最大总字节数。

默认值:no limit

  • dlqMaxPriority

    • 死信队列中消息的最大优先级(0-255)。

默认值:none

  • dlqOverflowBehavior

    • 超过dlqMaxLengthdlqMaxLengthBytes时应采取的行动;当前为drop-headreject-publish,但请参考 RabbitMQ 文档。

默认值:none

  • dlqTtl

    • 声明时应用于死信队列的默认生存时间(以毫秒为单位)。

默认值:no limit

  • durableSubscription

    • 订阅是否应持久。仅当还设置了group时才有效。

默认值:true

  • exchangeAutoDelete

    • 如果declareExchange为 true,则是否应自动删除交换(即在删除最后一个队列之后将其删除)。

默认值:true

  • exchangeDurable

    • 如果declareExchange为 true,则该交换是否应该持久(即,在代理重新启动后仍然存在)。

默认值:true

  • exchangeType

    • 交换类型:非分区目标为directfanouttopic,分区目标为directtopic

默认值:topic

  • exclusive

    • 是否创建独家 Consumer。当为true时,并发应为 1.通常在需要严格 Order 但使热备用实例在发生故障后接管时使用。请参阅recoveryInterval,它控制备用实例尝试使用的频率。

默认值:false

  • expires

    • 删除未使用的队列的时间(以毫秒为单位)。

默认值:no expiration

  • failedDeclarationRetryInterval

    • 丢失队列时尝试消耗队列之间的时间间隔(以毫秒为单位)。

Default: 5000

  • headerPatterns

    • 从入站邮件 MapHeaders 的模式。

默认值:['*'](所有 Headers)。

  • lazy

    • 使用x-queue-mode=lazy参数声明队列。参见"Lazy Queues"。考虑使用策略而不是此设置,因为使用策略允许更改设置而不删除队列。

默认值:false

  • maxConcurrency

    • 最大 Consumer 数。

默认值:1

  • maxLength

    • 队列中的最大消息数。

默认值:no limit

  • maxLengthBytes

    • 来自所有消息的队列中的最大总字节数。

默认值:no limit

  • maxPriority

    • 队列中消息的最大优先级(0-255)。

默认值:none

  • missingQueuesFatal

    • 当找不到队列时,是否将条件视为致命并停止侦听器容器。默认值为false,以便容器 continue 尝试从队列中使用数据,例如在使用群集且承载非 HA 队列的节点关闭时。

默认值:false

  • overflowBehavior

    • 超过maxLengthmaxLengthBytes时应采取的行动;当前为drop-headreject-publish,但请参考 RabbitMQ 文档。

默认值:none

  • prefetch

    • Prefetch count.

默认值:1

  • prefix

    • 要添加到destination和队列名称的前缀。

Default: "".

  • queueDeclarationRetries

    • 缺少队列时重试消耗的次数。仅在missingQueuesFataltrue时相关。否则,容器将无限期地重试。

默认值:3

  • queueNameGroupOnly

    • 为 true 时,从名称等于group的队列中使用。否则,队列名称为destination.group。例如,在使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这很有用。

Default: false.

  • recoveryInterval

    • 连接恢复尝试之间的时间间隔(以毫秒为单位)。

默认值:5000

  • requeueRejected

    • 禁用重试或republishToDlqfalse时是否应重新排定交付失败。

默认值:false

  • republishDeliveryMode

    • republishToDlqtrue时,指定重新发布的邮件的传递方式。

默认值:DeliveryMode.PERSISTENT

  • republishToDlq

    • 默认情况下,拒绝重试后失败的消息将被拒绝。如果配置了死信队列(DLQ),RabbitMQ 会将失败的消息(未更改)路由到 DLQ。如果设置为true,则绑定程序将使用其他 Headers 将失败的消息重新发布到 DLQ,包括异常消息和来自最终失败原因的堆栈跟踪。

Default: false

  • transacted

    • 是否使用 TransactionChannel。

默认值:false

  • ttl

    • 声明时应用于队列的默认生存时间(以毫秒为单位)。

默认值:no limit

  • txSize

    • 两次之间的分娩次数。

默认值:1

41.3.3 高级侦听器容器配置

要设置未公开为绑定程序或绑定属性的侦听器容器属性,请向应用程序上下文中添加类型为ListenerContainerCustomizer的单个 bean。将设置 Binder 和绑定属性,然后将调用定制程序。定制程序(configure()方法)具有队列名称以及使用者组作为参数。

41.3.4 Rabbit Producer 属性

以下属性仅适用于 Rabbit 生产者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.作为前缀。

  • autoBindDlq

    • 是否自动声明 DLQ 并将其绑定到绑定器 DLX。

默认值:false

  • batchingEnabled

    • 是否启用生产者的邮件批处理。消息根据以下属性(在此列表的后三个条目中进行描述)被批处理为一条消息:'batchSize',batchBufferLimitbatchTimeout。有关更多信息,请参见Batching

默认值:false

  • batchSize

    • 启用批处理时要缓冲的消息数。

默认值:100

  • batchBufferLimit

    • 启用批处理时的最大缓冲区大小。

默认值:10000

  • batchTimeout

    • 启用批处理时的批处理超时。

默认值:5000

  • bindingRoutingKey

    • 用于将队列绑定到交换机的路由密钥(如果bindQueuetrue)。仅适用于未分区的目的地。仅在提供requiredGroups后才适用,然后仅适用于那些组。

默认值:#

  • bindQueue

    • 是否将队列绑定到目标交换机。如果您已经设置了自己的基础结构并且先前已创建并绑定了队列,请将其设置为false。仅在提供requiredGroups后才适用,然后仅适用于那些组。

默认值:true

  • compress

    • 发送时是否应压缩数据。

默认值:false

  • deadLetterQueueName

    • DLQ 的名称仅在提供requiredGroups后才适用,然后仅适用于这些组。

默认值:prefix+destination.dlq

  • deadLetterExchange

    • 分配给队列的 DLX。仅在autoBindDlqtrue时相关。仅在提供requiredGroups时适用,然后仅适用于那些组。

Default: 'prefix+DLX'

  • deadLetterExchangeType

    • 分配给队列的 DLX 的类型。仅当autoBindDlqtrue时才相关。仅在提供requiredGroups时适用,然后仅适用于那些组。

Default: 'direct'

  • deadLetterRoutingKey

    • 分配给队列的死信路由键。仅在autoBindDlqtrue时相关。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:destination

  • declareDlx

    • 是否声明目的地交换死信。仅当autoBindDlqtrue时才相关。如果您有预配置的 DLX,请设置为false。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:true

  • declareExchange

    • 是否声明目的地交换。

默认值:true

  • delayExpression

    • 一个 SpEL 表达式,用于评估应用于消息的延迟(x-delayHeaders)。如果交换不是延迟的消息交换,则无效。

默认值:未设置x-delayHeaders。

  • delayedExchange

    • 是否将交换声明为Delayed Message Exchange。在代理上需要延迟的消息交换插件。 x-delayed-type参数设置为exchangeType

默认值:false

  • deliveryMode

    • 交付方式。

默认值:PERSISTENT

  • dlqDeadLetterExchange

    • 声明 DLQ 后,将分配给该队列的 DLX。仅在提供requiredGroups后才适用,然后仅对那些组适用。

默认值:none

  • dlqDeadLetterRoutingKey

    • 声明 DLQ 后,分配给该队列的死信路由密钥。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:none

  • dlqExpires

    • 删除未使用的死信队列之前的时间(以毫秒为单位)。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:no expiration

  • dlqLazy

    • x-queue-mode=lazy参数声明死信队列。参见"Lazy Queues"。考虑使用策略而不是此设置,因为使用策略允许更改设置而不删除队列。仅在提供requiredGroups时适用,然后仅适用于那些组。
  • dlqMaxLength

    • 死信队列中的最大消息数。仅在提供requiredGroups后才适用,然后仅对那些组适用。

默认值:no limit

  • dlqMaxLengthBytes

    • 所有消息中的死信队列中的最大总字节数。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:no limit

  • dlqMaxPriority

    • 死信队列中邮件的最大优先级(0-255)仅在提供requiredGroups时才适用,然后仅适用于那些组。

默认值:none

  • dlqTtl

    • 声明时应用于死信队列的默认生存时间(以毫秒为单位)。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:no limit

  • exchangeAutoDelete

    • 如果declareExchangetrue,则是否应该自动删除交换(在删除最后一个队列之后将其删除)。

默认值:true

  • exchangeDurable

    • 如果declareExchangetrue,则交换是否应该持久(在代理重新启动后生存)。

默认值:true

  • exchangeType

    • 交换类型:非分区目标为directfanouttopic,分区目标为directtopic

默认值:topic

  • expires

    • 删除未使用的队列之前的时间(以毫秒为单位)。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:no expiration

  • headerPatterns

    • Headers 要 Map 到出站邮件的模式。

默认值:['*'](所有 Headers)。

  • lazy

    • 使用x-queue-mode=lazy参数声明队列。参见"Lazy Queues"。考虑使用策略而不是此设置,因为使用策略允许更改设置而不删除队列。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:false

  • maxLength

    • 队列中的最大消息数。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:no limit

  • maxLengthBytes

    • 来自所有消息的队列中的最大总字节数。仅在提供requiredGroups后才适用,然后仅适用于那些组。

默认值:no limit

  • maxPriority

    • 队列中消息的最大优先级(0-255)。仅在提供requiredGroups后才适用,然后仅适用于那些组。

默认值:none

  • prefix

    • 要添加到destination交换名称的前缀。

Default: "".

  • queueNameGroupOnly

    • true时,从名称等于group的队列中使用。否则,队列名称为destination.group。例如,在使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时,这很有用。仅在提供requiredGroups时适用,然后仅适用于那些组。

Default: false.

  • routingKeyExpression

    • 一个 SpEL 表达式,用于确定发布消息时要使用的路由密钥。对于固定的路由键,请使用 Literals 表达式,例如属性文件中的routingKeyExpression='my.routingKey'或 YAML 文件中的routingKeyExpression: '''my.routingKey'''

默认值:destinationdestination-<partition>(用于分区目标)。

  • transacted

    • 是否使用 TransactionChannel。

默认值:false

  • ttl

    • 声明时适用于队列的默认生存时间(以毫秒为单位)。仅在提供requiredGroups时适用,然后仅适用于那些组。

默认值:no limit

Note

对于 RabbitMQ,可以由外部应用程序设置 Content TypeHeaders。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,该协议用于任何类型的传输,包括不支持 Headers 的传输,例如 Kafka(0.11 之前的版本)。

41.4 使用 RabbitMQBinder 重试

当在 Binder 中启用重试时,侦听器容器线程将在配置的任何退避期间暂停。当需要单个 Consumer 进行严格 Order 时,这可能很重要。但是,对于其他用例,它阻止在该线程上处理其他消息。使用 Binder 重试的另一种方法是设置带有时间的无效字母,以保留在无效字母队列(DLQ)中,并在 DLQ 本身上进行无效字母配置。有关此处讨论的属性的更多信息,请参见“ 第 41.3.1 节“ RabbitMQBinder 属性””。您可以使用以下示例配置来启用此功能:

  • autoBindDlq设置为true。Binder 创建一个 DLQ。 (可选)您可以在deadLetterQueueName中指定名称。

  • dlqTtl设置为您要在两次重新交付之间 await 的退避时间。

  • dlqDeadLetterExchange设置为默认交换。由于默认deadLetterRoutingKey是队列名(destination.group),因此来自 DLQ 的过期消息被路由到原始队列。设置为默认交换是通过将属性设置为无值来实现的,如下例所示。

要强制对消息进行死信处理,请抛出AmqpRejectAndDontRequeueException或将requeueRejected设置为true(默认值)并抛出任何异常。

循环无休止地 continue 进行,这对于瞬态问题很好,但是您可能需要在尝试几次后放弃。幸运的是,RabbitMQ 提供了x-deathHeaders,可让您确定发生了多少个循环。

要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException

41.4.1 放在一起

以下配置创建一个具有队列myDestination.consumerGroup的交换myDestination,队列myDestination.consumerGroup绑定到具有通配符路由键#的主题交换:

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置创建绑定到使用路由密钥myDestination.consumerGroup的直接交换机(DLX)的 DLQ。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,该消息到期,并通过使用队列名称作为路由键将其路由到原始队列,如以下示例所示:

Spring Boot 应用程序.

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-deathHeaders 中的 count 属性是Long

41.5 错误通道

从版本 1.3 开始,绑定程序无条件地将异常发送到每个使用者目的地的错误通道,也可以将其配置为将异步生产者发送失败消息发送到错误通道。有关更多信息,请参见“ ???”。

RabbitMQ 有两种类型的发送失败:

后者很少见。根据 RabbitMQ 文档,“ [仅在负责队列的 Erlang 进程中发生内部错误时,才会传递[nack]”。

除了启用生产者错误通道(如“ ???”中所述)外,RabbitMQ 绑定器仅在正确配置了连接工厂的情况下才将消息发送到通道,如下所示:

  • ccf.setPublisherConfirms(true);

  • ccf.setPublisherReturns(true);

将 Spring Boot 配置用于连接工厂时,请设置以下属性:

  • spring.rabbitmq.publisher-confirms

  • spring.rabbitmq.publisher-returns

返回消息的ErrorMessage的有效负载是具有以下属性的ReturnedAmqpMessageException

  • failedMessage:Spring 消息Message<?>发送失败。

  • amqpMessage:原始 spring-amqp Message

  • replyCode:指示失败原因的整数值(例如 312-无路由)。

  • replyText:指示失败原因的文本值(例如NO_ROUTE)。

  • exchange:发布消息的 Transaction 所。

  • routingKey:发布消息时使用的路由键。

对于否定确认的确认,有效负载是具有以下属性的NackedAmqpMessageException

  • failedMessage:Spring 消息Message<?>发送失败。

  • nackReason:原因(如果可用,则可能需要检查代理日志以了解更多信息)。

无法自动处理这些异常(例如发送到dead-letter queue)。您可以通过自己的 Spring Integration 流使用这些异常。

41.6 死信队列处理

因为您无法预期用户将如何处置死信,所以该框架没有提供任何标准机制来处理它们。如果死信的原因是短暂的,则您可能希望将消息路由回原始队列。但是,如果问题是永久性问题,则可能导致无限循环。下面的 Spring Boot 应用程序显示了一个示例,该示例演示如何将这些消息路由回原始队列,但在尝试三次后将其移至第三个“停车场”队列。第二个示例使用RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在此示例中,每次尝试的延迟都会增加。这些示例使用@RabbitListener接收来自 DLQ 的消息。您也可以在批处理过程中使用RabbitTemplate.receive()

这些示例假定原始目的地为so8400in,而使用者组为so8400

41.6.1 非分区目标

前两个示例适用于目标未分区的情况:

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

41.6.2 分区目标

对于分区目标,所有分区都有一个 DLQ。我们从标题确定原始队列。

republishToDlq=false

republishToDlqfalse时,RabbitMQ 使用x-deathHeaders 将消息发布到 DLX/DLQ,该 Headers 包含有关原始目的地的信息,如以下示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

republishToDlqtrue时,重新发布的恢复程序会将原始交换和路由密钥添加到 Headers 中,如以下示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

41.7 使用 RabbitMQBinder 进行分区

RabbitMQ 不支持本地分区。

有时,将数据发送到特定分区是有利的-例如,当您要严格 Order 消息处理时,特定 Client 的所有消息都应转到同一分区。

RabbitMessageChannelBinder通过将每个分区的队列绑定到目标交换机来提供分区。

以下 Java 和 YAML 示例显示了如何配置生产者:

Producer.

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

Note

前例中的配置使用默认分区(key.hashCode() % partitionCount)。根据键值,这可能会或可能不会提供适当的平衡算法。您可以使用partitionSelectorExpressionpartitionSelectorClass属性覆盖此默认设置。

只有在部署生产方时需要供应方队列时,才需要required-groups属性。否则,发送到分区的所有消息都将丢失,直到部署了相应的使用者为止。

以下配置提供了主题交换:

part exchange

该交换绑定了以下队列:

part queues

以下绑定将队列与交换关联:

part bindings

以下 Java 和 YAML 示例是前面示例的 continue,并显示了如何配置使用者:

Consumer.

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

Tip

RabbitMessageChannelBinder不支持动态缩放。每个分区至少必须有一个使用者。使用者的instanceIndex用于指示使用哪个分区。诸如 Cloud Foundry 之类的平台只能有一个实例带有instanceIndex