38. RabbitMQ Binder
38.1 用法
要使用 RabbitMQ binder,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream RabbitMQ Starter。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
38.2 RabbitMQ Binder 概述
下面是 RabbitMQ binder 如何操作的简化图。
图 1_.RabbitMQ Binder
RabbitMQ Binder implementation maps 将每个目标映射到TopicExchange
。对于每个 consumer group,Queue
将绑定到TopicExchange
。每个 consumer 实例的 group 的Queue
都有一个对应的 RabbitMQ Consumer
实例。对于分区 producers/consumers,队列以分区索引为后缀,并使用分区索引作为路由 key。
使用autoBindDlq
选项,您可以选择配置 binder 来创建和配置 dead-letter 队列(DLQ)(以及 dead-letter 交换DLX
)。死信队列具有目标的 name,附加.dlq
。如果启用了重试(maxAttempts > 1
),则失败的消息将被传递到 DLQ。如果禁用重试(maxAttempts = 1
),则应将requeueRejected
设置为false
(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。另外,republishToDlq
会导致 binder 将失败的消息发布到 DLQ(而不是拒绝它);这样可以在_header 中的消息中添加其他信息,例如x-exception-stacktrace
标头中的堆栈跟踪。此选项不需要重试启用;只需一次尝试即可重新发布失败的消息。从 version 1.2 开始,您可以配置重新发布的消息的传递模式;见 property republishDeliveryMode
。
将
requeueRejected
设置为true
将导致消息被重新排队并连续重新传送,除非故障问题是暂时的,否则这可能不是您想要的。通常,最好通过将maxAttempts
设置为大于 1 来启用 binder 中的重试,或将republishToDlq
设置为true
。
有关这些 properties 的更多信息,请参见第 38.3.1 节,“RabbitMQ Binder Properties”。
framework 没有提供消耗 dead-letter 消息的任何标准机制(或者 re-route 消息返回主队列)。一些选项在第 38.6 节,“Dead-Letter 队列处理”中描述。
当在 Spring Cloud Stream application 中使用多个 RabbitMQ binders 时,禁用'RabbitAutoConfiguration'以避免将
RabbitAutoConfiguration
应用于两个 binders 的相同配置非常重要。
从 version 1.3 开始,RabbitMessageChannelBinder
为 non-transactional 生成器创建一个内部ConnectionFactory
副本,以避免在共享时消费者死锁,由于 Broker 上的Memory 报警而阻塞了高速缓存连接。
38.3 Configuration 选项
此部分包含特定于 RabbitMQ Binder 和绑定 channels 的设置。
有关常规 binding configuration 选项和 properties,请参阅Spring Cloud Stream 核心文档。
38.3.1 RabbitMQ Binder Properties
默认情况下,RabbitMQ binder 使用 Spring Boot 的ConnectionFactory
,因此它支持 RabbitMQ 的所有 Spring Boot configuration 选项。 (对于 reference,请参阅Spring Boot 文档 .) RabbitMQ configuration 选项使用spring.rabbitmq
前缀。
除了 Spring Boot 选项之外,RabbitMQ binder 还支持以下 properties:
-
spring.cloud.stream.rabbit.binder.adminAddresses
- 一个 comma-separated RabbitMQ management 插件 URL 列表。仅在
nodes
包含多个条目时使用。此列表中的每个条目都必须在spring.rabbitmq.addresses
中具有相应的条目。 **仅在您使用 RabbitMQ cluster 并希望从承载队列的节点使用时才需要.**有关详细信息,请参阅 Queue Affinity 和 LocalizedQueueConnectionFactory.
- 一个 comma-separated RabbitMQ management 插件 URL 列表。仅在
默认值:空。
-
spring.cloud.stream.rabbit.binder.nodes
- RabbitMQ 节点名称的 comma-separated 列表。当多个条目用于查找队列所在的服务器地址时。此列表中的每个条目都必须在
spring.rabbitmq.addresses
中具有相应的条目。 **仅在您使用 RabbitMQ cluster 并希望从承载队列的节点使用时才需要.**有关详细信息,请参阅 Queue Affinity 和 LocalizedQueueConnectionFactory.
- RabbitMQ 节点名称的 comma-separated 列表。当多个条目用于查找队列所在的服务器地址时。此列表中的每个条目都必须在
默认值:空。
-
spring.cloud.stream.rabbit.binder.compressionLevel
- 压缩绑定的压缩 level。见
java.util.zip.Deflater
。
- 压缩绑定的压缩 level。见
默认值:1
(BESTLEVEL)。
38.3.2 RabbitMQ Consumer Properties
以下 properties 仅适用于 Rabbit 使用者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
为前缀。
-
acknowledgeMode
- 确认模式。
默认值:AUTO
。
-
autoBindDlq
- 是否自动声明 DLQ 并将其绑定到 binder DLX。
默认值:false
。
-
bindingRoutingKey
- 用于将队列绑定到交换的路由 key(如果
bindQueue
是true
)。对于分区目的地-<instanceIndex>
将被追加。
- 用于将队列绑定到交换的路由 key(如果
默认值:#
。
-
bindQueue
- 是否将队列绑定到目标交换;如果您已设置自己的基础架构并且之前已经 created/bound 队列,则设置为
false
。
- 是否将队列绑定到目标交换;如果您已设置自己的基础架构并且之前已经 created/bound 队列,则设置为
默认值:true
。
-
deadLetterQueueName
- DLQ 的 name
默认值:prefix+destination.dlq
-
deadLetterExchange
- 要分配给队列的 DLX;如果 autoBindDlq 是 true
默认值:'前缀 DLX'
-
deadLetterRoutingKey
- 一个死信路由 key 分配给队列;如果 autoBindDlq 是 true
默认值:destination
-
declareExchange
- 是否申报目的地的交换。
默认值:true
。
-
delayedExchange
- 是否将交换声明为
Delayed Message Exchange
- 需要 broker 上的延迟消息交换插件。x-delayed-type
参数设置为exchangeType
。
- 是否将交换声明为
默认值:false
。
-
dlqDeadLetterExchange
- 如果声明了 DLQ,则分配给该队列的 DLX
默认值:none
-
dlqDeadLetterRoutingKey
- 如果声明了 DLQ,则将死信路由 key 分配给该队列;默认 none
默认值:none
-
dlqExpires
- 删除未使用的死信队列前的__(_)
默认值:no expiration
-
dlqLazy
- 使用
x-queue-mode=lazy
参数声明死信队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。
- 使用
默认值:false
。
-
dlqMaxLength
- 死信队列中的最大消息数
默认值:no limit
-
dlqMaxLengthBytes
- 所有消息中死信队列中的最大总字节数
默认值:no limit
-
dlqMaxPriority
- 死信队列中的消息的最大优先级(0-255)
默认值:none
-
dlqTtl
- 声明时默认 time 生效以应用于死信队列(ms)
默认值:no limit
-
durableSubscription
- 订阅是否应该持久。仅在设置
group
时有效。
- 订阅是否应该持久。仅在设置
默认值:true
。
-
exchangeAutoDelete
- 如果
declareExchange
是 true,则交换是否应为 auto-delete(在删除最后一个队列后删除)。
- 如果
默认值:true
。
-
exchangeDurable
- 如果
declareExchange
是 true,那么交换是否应该是持久的(幸存 broker 重启)。
- 如果
默认值:true
。
-
exchangeType
- 交换类型;对于 non-partitioned 目的地
direct
,fanout
或topic
;direct
或topic
用于分区目的地。
- 交换类型;对于 non-partitioned 目的地
默认值:topic
。
-
独家
- 创建一个独有的 consumer;当这是
true
时,并发应该是 1;通常在需要严格的 ordering 时使用,但在发生故障后启用热备用实例。请参阅recoveryInterval
,它控制备用实例尝试使用的频率。
- 创建一个独有的 consumer;当这是
默认值:false
。
-
到期
- 在删除未使用的队列之前如何 long(ms)
默认值:no expiration
-
failedDeclarationRetryInterval
- 队列消耗尝试之间的间隔(ms)(如果缺少)。
默认值:5000
-
headerPatterns
- 要从入站消息映射 headers 的模式。
默认值:['*']
(所有 headers)。
-
懒
- 使用
x-queue-mode=lazy
参数声明队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。
- 使用
默认值:false
。
-
maxConcurrency
- 最大消费者数量
默认值:1
。
-
最长长度
- 队列中的最大消息数
默认值:no limit
-
maxLengthBytes
- 所有消息中队列中的最大总字节数
默认值:no limit
-
maxPriority
- 队列中消息的最大优先级(0-255)
-
默认
none
-
missingQueuesFatal
- 如果找不到队列,请将条件视为致命,并停止 listener 容器。默认为
false
,以便容器继续尝试从队列中使用,例如,当使用 cluster 并且托管非 HA 队列的节点关闭时。
- 如果找不到队列,请将条件视为致命,并停止 listener 容器。默认为
-
默认
false
-
预取
- 预取计数。
默认值:1
。
-
字首
- 要添加到
destination
和队列的 name 的前缀。
- 要添加到
默认值:“”。
-
queueDeclarationRetries
- 如果丢失,则从队列中重试 consuming 的次数。仅在
missingQueuesFatal
为true
时才相关;否则容器会无限期地重试。
- 如果丢失,则从队列中重试 consuming 的次数。仅在
-
默认
3
-
queueNameGroupOnly
- 当 true 时,从 name 等于
group
的队列中消耗;否则队列 name 是destination.group
。这很有用,例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时。
- 当 true 时,从 name 等于
默认值:false。
-
recoveryInterval
- 连接恢复尝试之间的间隔,以毫秒为单位。
默认值:5000
。
-
requeueRejected
- 在禁用重试或 republishToDlq 为 false 时是否应重新排队传递失败。
默认值:false
。
-
republishDeliveryMode
- 当
republishToDlq
是true
时,指定重新发布的消息的传递模式。
- 当
默认值:DeliveryMode.PERSISTENT
-
republishToDlq
- 默认情况下,拒绝重试后失败的消息。如果配置了 dead-letter 队列(DLQ),RabbitMQ 将把失败的消息(未更改)路由到 DLQ。如果设置为
true
,binder 将使用其他 headers 将失败的消息重新发布到 DLQ,包括 exception 消息和最终失败原因的堆栈跟踪。
- 默认情况下,拒绝重试后失败的消息。如果配置了 dead-letter 队列(DLQ),RabbitMQ 将把失败的消息(未更改)路由到 DLQ。如果设置为
默认值:false
-
交易
- 是否使用事务处理 channels。
默认值:false
。
-
TTL
- 声明时默认 time 生效以应用于队列(ms)
默认值:no limit
-
txSize
- ack 之间的交付数量。
默认值:1
。
38.3.3 Rabbit Producer Properties
以下 properties 仅适用于 Rabbit 生成器,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.
为前缀。
-
autoBindDlq
- 是否自动声明 DLQ 并将其绑定到 binder DLX。
默认值:false
。
-
batchingEnabled
- 是否启用生产者的消息批处理。
默认值:false
。
-
BATCHSIZE
- 启用批处理时要缓冲的消息数。
默认值:100
。
-
batchBufferLimit
- 默认值:
10000
。
- 默认值:
-
batchTimeout
- 默认值:
5000
。
- 默认值:
-
bindingRoutingKey
- 用于将队列绑定到交换的路由 key(如果
bindQueue
是true
)。仅适用于 non-partitioned 目的地。仅在提供requiredGroups
时才适用,然后仅适用于这些组。
- 用于将队列绑定到交换的路由 key(如果
默认值:#
。
-
bindQueue
- 是否将队列绑定到目标交换;如果您已设置自己的基础架构并且之前已 created/bound 队列,则设置为
false
。仅在提供requiredGroups
时才适用,然后仅适用于这些组。
- 是否将队列绑定到目标交换;如果您已设置自己的基础架构并且之前已 created/bound 队列,则设置为
默认值:true
。
-
压缩
- 是否应在发送时压缩数据。
默认值:false
。
-
deadLetterQueueName
- DLQ 的 name 仅在提供
requiredGroups
时适用,然后仅适用于这些组。
- DLQ 的 name 仅在提供
默认值:prefix+destination.dlq
-
deadLetterExchange
- 要分配给队列的 DLX;如果 autoBindDlq 是 true 仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 要分配给队列的 DLX;如果 autoBindDlq 是 true 仅在提供
默认值:'前缀 DLX'
-
deadLetterRoutingKey
- 一个死信路由 key 分配给队列;如果 autoBindDlq 是 true 仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 一个死信路由 key 分配给队列;如果 autoBindDlq 是 true 仅在提供
默认值:destination
-
declareExchange
- 是否申报目的地的交换。
默认值:true
。
-
延迟
- 用于评估应用于消息的延迟(
x-delay
标头)的 SpEL 表达式 - 如果交换不是延迟消息交换,则无效。
- 用于评估应用于消息的延迟(
默认值:未设置x-delay
标头。
-
delayedExchange
- 是否将交换声明为
Delayed Message Exchange
- 需要 broker 上的延迟消息交换插件。x-delayed-type
参数设置为exchangeType
。
- 是否将交换声明为
默认值:false
。
-
deliveryMode
- 交货方式。
默认值:PERSISTENT
。
-
dlqDeadLetterExchange
- 如果声明了 DLQ,则分配给该队列的 DLX 仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 如果声明了 DLQ,则分配给该队列的 DLX 仅在提供
默认值:none
-
dlqDeadLetterRoutingKey
- 如果声明了 DLQ,则将死信路由 key 分配给该队列; default none 仅在提供
requiredGroups
时适用,然后仅适用于这些组。
- 如果声明了 DLQ,则将死信路由 key 分配给该队列; default none 仅在提供
默认值:none
-
dlqExpires
- 如何删除未使用的死信队列前的__(ms)仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 如何删除未使用的死信队列前的__(ms)仅在提供
默认值:no expiration
-
dlqLazy
- 使用
x-queue-mode=lazy
参数声明死信队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。仅在提供requiredGroups
时才适用,然后仅适用于这些组。
- 使用
-
dlqMaxLength
- 死信队列中的最大消息数仅在提供
requiredGroups
时才适用,然后仅适用于这些组。
- 死信队列中的最大消息数仅在提供
默认值:no limit
-
dlqMaxLengthBytes
- 所有消息的死信队列中的最大总字节数仅在提供
requiredGroups
时适用,然后仅适用于这些组。
- 所有消息的死信队列中的最大总字节数仅在提供
默认值:no limit
-
dlqMaxPriority
- 死信队列中消息的最大优先级(0-255)仅在提供
requiredGroups
时才适用,然后仅适用于这些组。
- 死信队列中消息的最大优先级(0-255)仅在提供
默认值:none
-
dlqTtl
- 声明时应用于死信队列的默认 time(ms)仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 声明时应用于死信队列的默认 time(ms)仅在提供
默认值:no limit
-
exchangeAutoDelete
- 如果
declareExchange
是 true,则交换是否应为 auto-delete(在删除最后一个队列后删除)。
- 如果
默认值:true
。
-
exchangeDurable
- 如果
declareExchange
是 true,那么交换是否应该是持久的(幸存 broker 重启)。
- 如果
默认值:true
。
-
exchangeType
- 交换类型;对于 non-partitioned 目的地
direct
,fanout
或topic
;direct
或topic
用于分区目的地。
- 交换类型;对于 non-partitioned 目的地
默认值:topic
。
-
到期
- 如何删除未使用的队列之前的__(ms)仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 如何删除未使用的队列之前的__(ms)仅在提供
默认值:no expiration
-
headerPatterns
- headers 的模式映射到出站消息。
默认值:['*']
(所有 headers)。
-
懒
- 使用
x-queue-mode=lazy
参数声明队列。见懒惰的队列。请考虑使用 policy 而不是此设置,因为使用 policy 允许更改设置而不删除队列。仅在提供requiredGroups
时才适用,然后仅适用于这些组。
- 使用
默认值:false
。
-
最长长度
- 队列中的最大消息数仅在提供
requiredGroups
时才适用,然后仅适用于这些组。
- 队列中的最大消息数仅在提供
默认值:no limit
-
maxLengthBytes
- 所有消息中队列中的最大总字节数仅在提供
requiredGroups
时适用,然后仅适用于这些组。
- 所有消息中队列中的最大总字节数仅在提供
默认值:no limit
-
maxPriority
- 队列中消息的最大优先级(0-255)仅在提供
requiredGroups
时才适用,然后仅适用于这些组。
- 队列中消息的最大优先级(0-255)仅在提供
-
默认
none
-
字首
- 要添加到
destination
交换的 name 的前缀。
- 要添加到
默认值:“”。
-
queueNameGroupOnly
- 当 true 时,从 name 等于
group
的队列中消耗;否则队列 name 是destination.group
。这很有用,例如,当使用 Spring Cloud Stream 从现有 RabbitMQ 队列中使用时。仅在提供requiredGroups
时才适用,然后仅适用于这些组。
- 当 true 时,从 name 等于
默认值:false。
-
routingKeyExpression
- 一个 SpEL 表达式,用于确定发布消息时要使用的路由 key。对于固定路由 key,请使用文字表达式 e.g. _pro在 properties 文件中,或
routingKeyExpression: '''my.routingKey'''
在 YAML 文件中。
- 一个 SpEL 表达式,用于确定发布消息时要使用的路由 key。对于固定路由 key,请使用文字表达式 e.g. _pro在 properties 文件中,或
默认值:destination
或destination-<partition>
表示分区目的地。
-
交易
- 是否使用事务处理 channels。
默认值:false
。
-
TTL
- 声明时应用于队列的默认 time(ms)仅在提供
requiredGroups
时才适用,然后仅适用于那些组。
- 声明时应用于队列的默认 time(ms)仅在提供
默认值:no limit
在 RabbitMQ 的情况下,content type headers 可以由外部 applications 设置。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,用于任何类型的传输(包括传输,例如 Kafka,通常不支持 headers)。
38.4 使用 RabbitMQ Binder 重试
38.4.1 概述
在 binder 中启用重试时,listener 容器线程将暂停以配置任何后退时段。当单个 consumer 需要严格的 ordering 时,这可能很重要,但对于其他用例,它可以防止在该线程上处理其他消息。使用 binder 重试的另一种方法是使用 time 设置死字法以生活在 dead-letter 队列(DLQ)上,以及 DLQ 本身上的 dead-letter configuration。有关此处讨论的 properties 的更多信息,请参见第 38.3.1 节,“RabbitMQ Binder Properties”。 Example configuration 以启用此 feature:
-
将
autoBindDlq
设置为true
- binder 将创建 DLQ;您可以选择在deadLetterQueueName
中指定 name -
将
dlqTtl
设置为您想要在重新传送之间等待的__ back -
将
dlqDeadLetterExchange
设置为默认交换 - 来自 DLQ 的过期消息将路由到原始队列,因为默认deadLetterRoutingKey
是队列 name(destination.group
)
要强制消息为 dead-lettered,要么抛出AmqpRejectAndDontRequeueException
,要么将requeueRejected
设置为true
并抛出任何 exception。
循环将继续没有结束,这对于瞬态问题很好,但是你可能想在经过一些尝试后放弃。幸运的是,RabbitMQ 提供x-death
标头,允许您确定发生了多少个循环。
要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException
。
38.4.2 全部放在一起
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
此 configuration 创建一个 exchange myDestination
,其队列myDestination.consumerGroup
绑定到带有通配符路由 key #
的 topic 交换。它创建一个 DLQ 绑定到直接交换DLX
与路由 key myDestination.consumerGroup
。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息到期并使用 queue name 作为路由 key 路由到原始队列。
Spring Boot application.
@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
}
}
请注意,x-death
标头中的 count property 是Long
。
38.5 错误 Channels
从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见名为“Message Channel Binder and Error Channels”的部分。
使用 rabbitmq,有两种类型的发送失败:
-
返回的消息
-
否定承认出版商确认
后者很少见;引用 RabbitMQ 文档“[228]只有在负责队列的 Erlang process 中发生内部错误时才会被传递。”
除了中所述启用 producer 错误 channels 之外,如果连接工厂配置正确,RabbitMQ binder 将仅向 channels 发送消息:
-
ccf.setPublisherConfirms(true);
-
ccf.setPublisherReturns(true);
将 spring boot configuration 用于连接工厂时,请设置 properties:
-
spring.rabbitmq.publisher-confirms
-
spring.rabbitmq.publisher-returns
返回消息的ErrorMessage
的有效负载是带有 properties 的ReturnedAmqpMessageException
:
-
failedMessage
- 无法发送的 spring-messagingMessage<?>
。 -
amqpMessage
- 原始 spring-amqpMessage
-
replyCode
- integer value 指示失败的原因(e.g. 312 - 否 route) -
replyText
- 表示失败原因 e.g 的文本 value。NO_ROUTE
。 -
exchange
- 消息发布的交换。 -
routingKey
- 发布消息时使用的路由 key。
对于否定确认的确认,有效负载是带有 properties 的NackedAmqpMessageException
:
-
failedMessage
- 无法发送的 spring-messagingMessage<?>
。 -
nackReason
- 一个原因(如果可用;您可能需要检查 broker 日志以获取更多信息)。
没有自动处理这些 exceptions(例如发送到Dead-Letter 队列);您可以使用自己的 Spring Integration 流来使用这些 exceptions。
38.6 Dead-Letter 队列处理
由于无法预测用户如何处理 dead-lettered 消息,framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始队列。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下spring-boot
application 是如何将这些消息重新路由回原始队列的示例,但在三次尝试后将它们移动到第三个“停车场”队列。第二个 example 使用RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener
从 DLQ 接收消息,您也可以在批处理 process 中使用RabbitTemplate.receive()
。
这些示例假设原始目标是so8400in
,consumer group 是so8400
。
38.6.1 Non-Partitioned 目的地
前两个示例是目标**未分区的情况。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
38.6.2 分区目的地
对于分区目标,所有分区都有一个 DLQ,我们从 headers 确定原始队列。
republishToDlq=false
当republishToDlq
是false
时,RabbitMQ 使用包含有关原始目标的信息的x-death
标头将消息发布到 DLX/DLQ。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@SuppressWarnings("unchecked")
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
republishToDlq=true
当republishToDlq
是true
时,重新发布恢复器会将原始交换和路由 key 添加到 headers。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}