39. RabbitMQ Binder

39.1 用法

要使用 RabbitMQ binder,可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

39.2 RabbitMQ Binder 概述

以下简化图显示了 RabbitMQ binder 的运行方式:

图 1_.RabbitMQ Binder

rabbit binder

默认情况下,RabbitMQ Binder implementation maps 将每个目标映射到TopicExchange。对于每个 consumer group,Queue绑定到TopicExchange。每个 consumer 实例的 group 的Queue都有一个对应的 RabbitMQ Consumer实例。对于分区生成器和使用者,队列以分区索引为后缀,并使用分区索引作为路由 key。对于匿名使用者(没有group property 的用户),使用 auto-delete 队列(具有随机的唯一 name)。

通过使用可选的autoBindDlq选项,您可以配置 binder 以创建和配置 dead-letter 队列(DLQ)(以及 dead-letter 交换DLX,以及路由基础结构)。默认情况下,死信队列具有目标的 name,并附加.dlq。如果启用了重试(maxAttempts > 1),则在重试耗尽后,失败的消息将传递到 DLQ。如果禁用重试(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便将失败的消息路由到 DLQ,而不是 re-queued。此外,republishToDlq会导致 binder 将失败的消息发布到 DLQ(而不是拒绝它)。此 feature 允许将其他信息(例如x-exception-stacktrace标题中的堆栈跟踪)添加到 headers 中的消息中。此选项不需要重试。只需一次尝试,您就可以重新发布失败的消息。从 version 1.2 开始,您可以配置重新发布的消息的传递模式。见republishDeliveryMode property

requeueRejected设置为true(使用republishToDlq=false)会导致消息 re-queued 并连续重新传递,这可能不是您想要的,除非失败的原因是暂时的。通常,您应该通过将maxAttempts设置为大于 1 或将republishToDlq设置为true来启用 binder 中的重试。

有关这些 properties 的更多信息,请参见第 39.3.1 节,“RabbitMQ Binder Properties”

framework 没有提供消耗 dead-letter 消息的任何标准机制(或者 re-route 消息返回主队列)。一些选项在第 39.6 节,“Dead-Letter 队列处理”中描述。

当在 Spring Cloud Stream application 中使用多个 RabbitMQ binders 时,禁用'RabbitAutoConfiguration'以避免从RabbitAutoConfiguration应用于两个 binders 的相同配置非常重要。您可以使用@SpringBootApplication annotation 排除 class。

从 version 2.0 开始,RabbitMessageChannelBinder _set 将RabbitTemplate.userPublisherConnection property 设置为true,以便 non-transactional 生成器避免消费者死锁,如果缓存连接因 broker 上的memory 警报而被阻止,则可能发生死锁。

39.3 Configuration 选项

此部分包含特定于 RabbitMQ Binder 和绑定 channels 的设置。

有关常规 binding configuration 选项和 properties,请参阅Spring Cloud Stream 核心文档

39.3.1 RabbitMQ Binder Properties

默认情况下,RabbitMQ binder 使用 Spring Boot 的ConnectionFactory。因此,它支持 RabbitMQ 的所有 Spring Boot configuration 选项。 (对于 reference,请参阅Spring Boot 文档)。 RabbitMQ configuration 选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ binder 还支持以下 properties:

  • spring.cloud.stream.rabbit.binder.adminAddresses

    • RabbitMQ management 插件 URL 的 comma-separated 列表。仅在nodes包含多个条目时使用。此列表中的每个条目必须在spring.rabbitmq.addresses中具有相应的条目。仅在您使用 RabbitMQ cluster 并希望从承载队列的节点使用时才需要。有关更多信息,请参见队列亲和力和 LocalizedQueueConnectionFactory

默认值:空。

  • spring.cloud.stream.rabbit.binder.nodes

    • 一个 comma-separated RabbitMQ 节点名称列表。当多个条目用于查找队列所在的服务器地址时。此列表中的每个条目都必须在spring.rabbitmq.addresses中具有相应的条目。仅在您使用 RabbitMQ cluster 并希望从承载队列的节点使用时才需要。有关更多信息,请参见队列亲和力和 LocalizedQueueConnectionFactory

默认值:空。

  • spring.cloud.stream.rabbit.binder.compressionLevel

    • 压缩绑定的压缩 level。见java.util.zip.Deflater

默认值:1(BESTLEVEL)。

  • spring.cloud.stream.binder.connection-name-prefix

    • 用于 name connection(s 的连接 name 前缀,由 binder 创建。 name 是此前缀后跟#n,其中n在每次打开新连接时递增。

默认值:none(Spring AMQP 默认值)。

39.3.2 RabbitMQ Consumer Properties

以下 properties 仅适用于 Rabbit 使用者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.为前缀。

  • acknowledgeMode

    • 确认模式。

默认值:AUTO

  • autoBindDlq

    • 是否自动声明 DLQ 并将其绑定到 binder DLX。

默认值:false

  • bindingRoutingKey

    • 用于将队列绑定到交换的路由 key(如果bindQueuetrue)。对于分区目标,将追加-<instanceIndex>

默认值:#

  • bindQueue

    • 是否将队列绑定到目标交换机。如果您已设置自己的基础架构并且之前已创建并绑定队列,请将其设置为false

默认值:true

  • deadLetterQueueName

    • DLQ 的 name

默认值:prefix+destination.dlq

  • deadLetterExchange

    • 要分配给队列的 DLX。仅当autoBindDlqtrue时才相关。

默认值:'前缀 DLX'

  • deadLetterRoutingKey

    • 一个死信路由 key 分配给队列。仅当autoBindDlqtrue时才相关。

默认值:destination

  • declareExchange

    • 是否申报目的地的交换。

默认值:true

  • delayedExchange

    • 是否将交换声明为Delayed Message Exchange。需要 broker 上的延迟消息交换插件。 x-delayed-type参数设置为exchangeType

默认值:false

  • dlqDeadLetterExchange

    • 如果声明了 DLQ,则分配给该队列的 DLX。

默认值:none

  • dlqDeadLetterRoutingKey

    • 如果声明了 DLQ,则将死信路由 key 分配给该队列。

默认值:none

  • dlqExpires

    • 删除未使用的死信队列之前的__(以毫秒为单位)。

默认值:no expiration

  • dlqLazy

    • 使用x-queue-mode=lazy参数声明死信队列。见“懒惰的队列”。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。

默认值:false

  • dlqMaxLength

    • 死信队列中的最大消息数。

默认值:no limit

  • dlqMaxLengthBytes

    • 所有消息中死信队列中的最大总字节数。

默认值:no limit

  • dlqMaxPriority

    • 死信队列(0-255)中消息的最大优先级。

默认值:none

  • dlqTtl

    • 默认 time 生效以在声明时应用于死信队列(以毫秒为单位)。

默认值:no limit

  • durableSubscription

    • 订阅是否应该是持久的。仅在设置group时有效。

默认值:true

  • exchangeAutoDelete

    • 如果declareExchange是 true,则交换是否应该是 auto-deleted(即删除最后一个队列后删除)。

默认值:true

  • exchangeDurable

    • 如果declareExchange是 true,那么交换是否应该是持久的(也就是说,它在 broker 重启后仍然存在)。

默认值:true

  • exchangeType

    • 交换类型:non-partitioned,fanouttopic用于 non-partitioned 目的地,directtopic用于分区目的地。

默认值:topic

  • 独家

    • 是否创建独家 consumer。当true时,并发应为 1。通常在需要严格 ordering 但在发生故障后启用热备用实例时使用。请参阅recoveryInterval,它控制备用实例尝试使用的频率。

默认值:false

  • 到期

    • 如何删除未使用的队列前的__(以毫秒为单位)。

默认值:no expiration

  • failedDeclarationRetryInterval

    • 如果缺少队列,则尝试从队列中消耗的间隔(以毫秒为单位)。

默认值:5000

  • headerPatterns

    • 要从入站消息映射 headers 的模式。

默认值:['*'](所有 headers)。

    • 使用x-queue-mode=lazy参数声明队列。见“懒惰的队列”。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。

默认值:false

  • maxConcurrency

    • 最大消费者数量。

默认值:1

  • 最长长度

    • 队列中的最大消息数。

默认值:no limit

  • maxLengthBytes

    • 所有消息中队列中的最大总字节数。

默认值:no limit

  • maxPriority

    • 队列中消息的最大优先级(0-255)。

默认值:none

  • missingQueuesFatal

    • 当无法找到队列时,是否将条件视为致命并停止 listener 容器。默认为false,以便容器继续尝试从队列中消耗 - 例如,当使用 cluster 并且托管 non-HA 队列的节点关闭时。

默认值:false

  • 预取

    • 预取计数。

默认值:1

  • 字首

    • 要添加到destination和队列的 name 的前缀。

默认值:“”。

  • queueDeclarationRetries

    • 如果丢失,则从队列中重试 consuming 的次数。仅当missingQueuesFataltrue时才相关。否则,容器将无限期地重试。

默认值:3

  • queueNameGroupOnly

    • 当 true 时,从 name 等于group的队列中消耗。否则队列 name 是destination.group。这很有用,例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时。

默认值:false。

  • recoveryInterval

    • 连接恢复尝试之间的间隔,以毫秒为单位。

默认值:5000

  • requeueRejected

    • 当禁用重试或republishToDlqfalse时,传递失败应该是 re-queued。

默认值:false

  • republishDeliveryMode

    • republishToDlqtrue时,指定重新发布的消息的传递模式。

默认值:DeliveryMode.PERSISTENT

  • republishToDlq

    • 默认情况下,拒绝重试后失败的邮件。如果配置了 dead-letter 队列(DLQ),则 RabbitMQ 将失败的消息(未更改)路由到 DLQ。如果设置为true,binder 会使用其他 headers 将失败的消息重新发布到 DLQ,包括 exception 消息和最终失败原因的堆栈跟踪。

默认值:false

  • 交易

    • 是否使用事务处理 channels。

默认值:false

  • TTL

    • 声明时的默认 time 以应用于队列(以毫秒为单位)。

默认值:no limit

  • txSize

    • ack 之间的交付数量。

默认值:1

39.3.3 Rabbit Producer Properties

以下 properties 仅适用于 Rabbit 生成器,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.为前缀。

  • autoBindDlq

    • 是否自动声明 DLQ 并将其绑定到 binder DLX。

默认值:false

  • batchingEnabled

    • 是否启用生产者的消息批处理。根据以下 properties(在此列表中的下三个条目中描述)将消息批处理为一条消息:'batchSize',batchBufferLimitbatchTimeout。有关更多信息,请参见配料

默认值:false

  • BATCHSIZE

    • 启用批处理时要缓冲的消息数。

默认值:100

  • batchBufferLimit

    • 启用批处理时的最大缓冲区大小。

默认值:10000

  • batchTimeout

    • 批处理启用时的批处理超时。

默认值:5000

  • bindingRoutingKey

    • 用于将队列绑定到交换的路由 key(如果bindQueuetrue)。仅适用于 non-partitioned 目的地。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:#

  • bindQueue

    • 是否将队列绑定到目标交换机。如果您已设置自己的基础架构并且之前已创建并绑定队列,请将其设置为false。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:true

  • 压缩

    • 是否应在发送时压缩数据。

默认值:false

  • deadLetterQueueName

    • DLQ 的 name 仅在提供requiredGroups时适用,然后仅适用于这些组。

默认值:prefix+destination.dlq

  • deadLetterExchange

    • 要分配给队列的 DLX。仅当autoBindDlqtrue时才相关。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:'前缀 DLX'

  • deadLetterRoutingKey

    • 一个死信路由 key 分配给队列。仅当autoBindDlqtrue时才相关。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:destination

  • declareExchange

    • 是否申报目的地的交换。

默认值:true

  • delayExpression

    • 一个 SpEL 表达式,用于评估应用于消息的延迟(x-delay标头)。如果交换不是延迟消息交换,则无效。

默认值:未设置x-delay标头。

  • delayedExchange

    • 是否将交换声明为Delayed Message Exchange。需要 broker 上的延迟消息交换插件。 x-delayed-type参数设置为exchangeType

默认值:false

  • deliveryMode

    • 交货方式。

默认值:PERSISTENT

  • dlqDeadLetterExchange

    • 声明 DLQ 时,将分配给该队列的 DLX。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:none

  • dlqDeadLetterRoutingKey

    • 声明 DLQ 时,将死信路由 key 分配给该队列。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:none

  • dlqExpires

    • 如何删除未使用的死信队列之前的_(以毫秒为单位)。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no expiration

  • dlqLazy

    • 使用x-queue-mode=lazy参数声明死信队列。见“懒惰的队列”。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。仅在提供requiredGroups时才适用,然后仅适用于这些组。
  • dlqMaxLength

    • 死信队列中的最大消息数。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • dlqMaxLengthBytes

    • 所有消息中死信队列中的最大总字节数。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • dlqMaxPriority

    • 死信队列中的消息的最大优先级(0-255)仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:none

  • dlqTtl

    • 声明时应用于死信队列的默认 time(以毫秒为单位)。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • exchangeAutoDelete

    • 如果declareExchangetrue,则交换是否应该是 auto-delete(在删除最后一个队列后删除它)。

默认值:true

  • exchangeDurable

    • 如果declareExchangetrue,交换是否应该是持久的(幸存 broker 重启)。

默认值:true

  • exchangeType

    • 交换类型:non-partitioned,fanouttopic用于 non-partitioned 目的地,directtopic用于分区目的地。

默认值:topic

  • 到期

    • 删除未使用的队列之前如何_(以毫秒为单位)。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no expiration

  • headerPatterns

    • headers 的模式映射到出站消息。

默认值:['*'](所有 headers)。

    • 使用x-queue-mode=lazy参数声明队列。见“懒惰的队列”。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:false

  • 最长长度

    • 队列中的最大消息数。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • maxLengthBytes

    • 所有消息中队列中的最大总字节数。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

  • maxPriority

    • 队列中消息的最大优先级(0-255)。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:none

  • 字首

    • 要添加到destination交换的 name 的前缀。

默认值:“”。

  • queueNameGroupOnly

    • true时,从 name 等于group的队列中消耗。否则队列 name 是destination.group。这很有用,例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:false。

  • routingKeyExpression

    • 一个 SpEL 表达式,用于确定发布消息时要使用的路由 key。对于固定路由 key,请使用文字表达式,例如 properties 文件中的routingKeyExpression='my.routingKey'或 YAML 文件中的routingKeyExpression: '''my.routingKey'''

默认值:destinationdestination-<partition>表示分区目的地。

  • 交易

    • 是否使用事务处理 channels。

默认值:false

  • TTL

    • 声明时应用于队列的默认 time(以毫秒为单位)。仅在提供requiredGroups时才适用,然后仅适用于这些组。

默认值:no limit

在 RabbitMQ 的情况下,content type headers 可以由外部 applications 设置。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,用于任何类型的传输 - 包括传输,例如 Kafka(在 0.11 之前),本身不支持 headers。

39.4 使用 RabbitMQ Binder 重试

在 binder 中启用重试时,listener 容器线程将暂停以配置任何后退时段。当单个 consumer 需要严格的 ordering 时,这可能很重要。但是,对于其他用例,它会阻止在该线程上处理其他消息。使用 binder 重试的另一种方法是使用 time 设置死字法以生活在 dead-letter 队列(DLQ)上以及 DLQ 本身上的 dead-letter configuration。有关此处讨论的 properties 的更多信息,请参阅“第 39.3.1 节,“RabbitMQ Binder Properties””。您可以使用以下 example configuration 来启用此 feature:

  • autoBindDlq设置为true。 binder 创建一个 DLQ。 (可选)您可以在deadLetterQueueName中指定 name。

  • dlqTtl设置为您想要在重新传送之间等待的__ back。

  • dlqDeadLetterExchange设置为默认交换。来自 DLQ 的过期消息将路由到原始队列,因为缺省的deadLetterRoutingKey是队列 name(destination.group)。设置为默认交换是通过将 property 设置为 no value 来实现的,如下一个 example 所示。

要强制消息为 dead-lettered,要么抛出AmqpRejectAndDontRequeueException或将requeueRejected设置为true(默认值)并抛出任何 exception。

循环继续没有结束,这对于瞬态问题很好,但是你可能想在经过一些尝试后放弃。幸运的是,RabbitMQ 提供了x-death标头,可以让您确定发生了多少个循环。

要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException

39.4.1 全部放在一起

以下 configuration 创建一个 exchange myDestination,其队列myDestination.consumerGroup绑定到带有通配符路由 key #的 topic 交换:

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此 configuration 创建一个 DLQ 绑定到直接交换(DLX),路由 key 为myDestination.consumerGroup。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息过期并使用 queue name 作为路由 key 路由到原始队列,如以下 example 所示:

Spring Boot application.

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-death标头中的 count property 是Long

39.5 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且还可以配置为将异步 producer 发送失败发送到错误 channel。有关详细信息,请参阅“???”。

RabbitMQ 有两种类型的发送失败:

后者很少见。根据 RabbitMQ 文档“只有在负责队列的 Erlang process 中发生内部错误时才会传递”[258]。“

除了启用 producer 错误 channels(如“???”中所述)之外,如果连接工厂配置正确,RabbitMQ binder 仅向 channel 发送消息,如下所示:

  • ccf.setPublisherConfirms(true);

  • ccf.setPublisherReturns(true);

将 Spring Boot configuration 用于连接工厂时,请设置以下 properties:

  • spring.rabbitmq.publisher-confirms

  • spring.rabbitmq.publisher-returns

返回消息的ErrorMessage的有效负载是带有以下 properties 的ReturnedAmqpMessageException

  • failedMessage:无法发送的 spring-messaging Message<?>

  • amqpMessage:原始 spring-amqp Message

  • replyCode:integer value 指示失败的原因(对于 example,312 - No route)。

  • replyText:表示失败原因的文本 value(对于 example,NO_ROUTE)。

  • exchange:消息发布的交换。

  • routingKey:发布消息时使用的路由 key。

对于否定确认的确认,有效负载是NackedAmqpMessageException,具有以下 properties:

  • failedMessage:无法发送的 spring-messaging Message<?>

  • nackReason:一个原因(如果可用 - 您可能需要检查 broker 日志以获取更多信息)。

没有自动处理这些 exceptions(例如发送到dead-letter 队列)。您可以使用自己的 Spring Integration 流使用这些 exceptions。

39.6 Dead-Letter 队列处理

因为您无法预测用户将如何处理 dead-lettered 消息,所以 framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始队列。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下 Spring Boot application 显示了如何将这些消息重新路由回原始队列的示例,但在三次尝试后将它们移动到第三个“停车场”队列。第二个 example 使用RabbitMQ 延迟消息交换为 re-queued 消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener来接收来自 DLQ 的消息。您还可以在批处理 process 中使用RabbitTemplate.receive()

这些示例假设原始目标是so8400in,consumer group 是so8400

39.6.1 Non-Partitioned 目的地

前两个示例是针对目标未**分区的时间:

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

39.6.2 分区目的地

对于分区目标,所有分区都有一个 DLQ。我们从 headers 确定原始队列。

republishToDlq=false

republishToDlqfalse时,RabbitMQ 将消息发布到 DLX/DLQ,并带有包含有关原始目标的信息的x-death标头,如下面的示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

republishToDlqtrue时,重新发布恢复器会将原始交换和路由 key 添加到 headers,如下面的示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

39.7 使用 RabbitMQ Binder 进行分区

RabbitMQ 本身不支持分区。

有时,将数据发送到特定分区是有利的 - 例如,当您要严格 order 消息处理时,特定客户的所有消息都应该转到同一分区。

RabbitMessageChannelBinder通过将每个分区的队列绑定到目标交换机来提供分区。

以下 Java 和 YAML 示例显示了如何配置 producer:

监制.

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面 example 中的 configuration 使用默认分区(key.hashCode() % partitionCount)。根据 key 值,这可能会也可能不会提供适当平衡的算法。您可以使用partitionSelectorExpressionpartitionSelectorClass properties 覆盖此默认值。

只有在部署 producer 时需要配置 consumer 队列时,才需要required-groups property。否则,在部署相应的 consumer 之前,发送到分区的任何消息都将丢失。

以下 configuration 规定 topic 交换:

部分交换

以下队列绑定到该交换:

部分队列

以下绑定将队列关联到交换:

部分绑定

以下 Java 和 YAML 示例继续前面的示例并演示如何配置 consumer:

消费者.

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

RabbitMessageChannelBinder不支持动态缩放。每个分区必须至少有一个 consumer。 consumer 的instanceIndex用于指示使用哪个分区。像 Cloud Foundry 这样的平台只能有一个带有instanceIndex的实例。