38. RabbitMQ Binder

38.1 用法

要使用 RabbitMQ binder,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream RabbitMQ Starter。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

38.2 RabbitMQ Binder 概述

下面是 RabbitMQ binder 如何操作的简化图。

图 1_.RabbitMQ Binder

RabbitMQ Binder implementation maps 将每个目标映射到TopicExchange。对于每个 consumer group,Queue将绑定到TopicExchange。每个 consumer 实例的 group 的Queue都有一个对应的 RabbitMQ Consumer实例。对于分区 producers/consumers,队列以分区索引为后缀,并使用分区索引作为路由 key。

使用autoBindDlq选项,您可以选择配置 binder 来创建和配置 dead-letter 队列(DLQ)(以及 dead-letter 交换DLX)。死信队列具有目标的 name,附加.dlq。如果启用了重试(maxAttempts > 1),则失败的消息将被传递到 DLQ。如果禁用重试(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。另外,republishToDlq会导致 binder 将失败的消息发布到 DLQ(而不是拒绝它);这样可以在_header 中的消息中添加其他信息,例如x-exception-stacktrace标头中的堆栈跟踪。此选项不需要重试启用;只需一次尝试即可重新发布失败的消息。从 version 1.2 开始,您可以配置重新发布的消息的传递模式;见 property republishDeliveryMode

requeueRejected设置为true将导致消息被重新排队并连续重新传送,除非故障问题是暂时的,否则这可能不是您想要的。通常,最好通过将maxAttempts设置为大于 1 来启用 binder 中的重试,或将republishToDlq设置为true

有关这些 properties 的更多信息,请参见第 38.3.1 节,“RabbitMQ Binder Properties”

framework 没有提供消耗 dead-letter 消息的任何标准机制(或者 re-route 消息返回主队列)。一些选项在第 38.6 节,“Dead-Letter 队列处理”中描述。

当在 Spring Cloud Stream application 中使用多个 RabbitMQ binders 时,禁用'RabbitAutoConfiguration'以避免将RabbitAutoConfiguration应用于两个 binders 的相同配置非常重要。

从 version 1.3 开始,RabbitMessageChannelBinder为 non-transactional 生成器创建一个内部ConnectionFactory副本,以避免在共享时消费者死锁,由于 Broker 上的Memory 报警而阻塞了高速缓存连接。

38.3 Configuration 选项

此部分包含特定于 RabbitMQ Binder 和绑定 channels 的设置。

有关常规 binding configuration 选项和 properties,请参阅Spring Cloud Stream 核心文档

38.3.1 RabbitMQ Binder Properties

默认情况下,RabbitMQ binder 使用 Spring Boot 的ConnectionFactory,因此它支持 RabbitMQ 的所有 Spring Boot configuration 选项。 (对于 reference,请参阅Spring Boot 文档 .) RabbitMQ configuration 选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ binder 还支持以下 properties:

默认值:空。

默认值:空。

默认值:1(BESTLEVEL)。

38.3.2 RabbitMQ Consumer Properties

以下 properties 仅适用于 Rabbit 使用者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.为前缀。

默认值:AUTO

默认值:false

默认值:#

默认值:true

默认值:prefix+destination.dlq

默认值:'前缀 DLX'

默认值:destination

默认值:true

默认值:false

默认值:none

默认值:none

默认值:no expiration

默认值:false

默认值:no limit

默认值:no limit

默认值:none

默认值:no limit

默认值:true

默认值:true

默认值:true

默认值:topic

默认值:false

默认值:no expiration

默认值:5000

默认值:['*'](所有 headers)。

默认值:false

默认值:1

默认值:no limit

默认值:no limit

默认值:1

默认值:“”。

默认值:false。

默认值:5000

默认值:false

默认值:DeliveryMode.PERSISTENT

默认值:false

默认值:false

默认值:no limit

默认值:1

38.3.3 Rabbit Producer Properties

以下 properties 仅适用于 Rabbit 生成器,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.为前缀。

默认值:false

默认值:false

默认值:100

默认值:#

默认值:true

默认值:false

默认值:prefix+destination.dlq

默认值:'前缀 DLX'

默认值:destination

默认值:true

默认值:未设置x-delay标头。

默认值:false

默认值:PERSISTENT

默认值:none

默认值:none

默认值:no expiration

默认值:no limit

默认值:no limit

默认值:none

默认值:no limit

默认值:true

默认值:true

默认值:topic

默认值:no expiration

默认值:['*'](所有 headers)。

默认值:false

默认值:no limit

默认值:no limit

默认值:“”。

默认值:false。

默认值:destinationdestination-<partition>表示分区目的地。

默认值:false

默认值:no limit

在 RabbitMQ 的情况下,content type headers 可以由外部 applications 设置。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,用于任何类型的传输(包括传输,例如 Kafka,通常不支持 headers)。

38.4 使用 RabbitMQ Binder 重试

38.4.1 概述

在 binder 中启用重试时,listener 容器线程将暂停以配置任何后退时段。当单个 consumer 需要严格的 ordering 时,这可能很重要,但对于其他用例,它可以防止在该线程上处理其他消息。使用 binder 重试的另一种方法是使用 time 设置死字法以生活在 dead-letter 队列(DLQ)上,以及 DLQ 本身上的 dead-letter configuration。有关此处讨论的 properties 的更多信息,请参见第 38.3.1 节,“RabbitMQ Binder Properties”。 Example configuration 以启用此 feature:

要强制消息为 dead-lettered,要么抛出AmqpRejectAndDontRequeueException,要么将requeueRejected设置为true并抛出任何 exception。

循环将继续没有结束,这对于瞬态问题很好,但是你可能想在经过一些尝试后放弃。幸运的是,RabbitMQ 提供x-death标头,允许您确定发生了多少个循环。

要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException

38.4.2 全部放在一起

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此 configuration 创建一个 exchange myDestination,其队列myDestination.consumerGroup绑定到带有通配符路由 key #的 topic 交换。它创建一个 DLQ 绑定到直接交换DLX与路由 key myDestination.consumerGroup。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息到期并使用 queue name 作为路由 key 路由到原始队列。

Spring Boot application.

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-death标头中的 count property 是Long

38.5 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见名为“Message Channel Binder and Error Channels”的部分

使用 rabbitmq,有两种类型的发送失败:

后者很少见;引用 RabbitMQ 文档“[228]只有在负责队列的 Erlang process 中发生内部错误时才会被传递。”

除了中所述启用 producer 错误 channels 之外,如果连接工厂配置正确,RabbitMQ binder 将仅向 channels 发送消息:

将 spring boot configuration 用于连接工厂时,请设置 properties:

返回消息的ErrorMessage的有效负载是带有 properties 的ReturnedAmqpMessageException

对于否定确认的确认,有效负载是带有 properties 的NackedAmqpMessageException

没有自动处理这些 exceptions(例如发送到Dead-Letter 队列);您可以使用自己的 Spring Integration 流来使用这些 exceptions。

38.6 Dead-Letter 队列处理

由于无法预测用户如何处理 dead-lettered 消息,framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始队列。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下spring-boot application 是如何将这些消息重新路由回原始队列的示例,但在三次尝试后将它们移动到第三个“停车场”队列。第二个 example 使用RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener从 DLQ 接收消息,您也可以在批处理 process 中使用RabbitTemplate.receive()

这些示例假设原始目标是so8400in,consumer group 是so8400

38.6.1 Non-Partitioned 目的地

前两个示例是目标**未分区的情况。

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

38.6.2 分区目的地

对于分区目标,所有分区都有一个 DLQ,我们从 headers 确定原始队列。

republishToDlq=false

republishToDlqfalse时,RabbitMQ 使用包含有关原始目标的信息的x-death标头将消息发布到 DLX/DLQ。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

republishToDlqtrue时,重新发布恢复器会将原始交换和路由 key 添加到 headers。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}
首页