39. RabbitMQ Binder

39.1 用法

要使用 RabbitMQ binder,可以使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

39.2 RabbitMQ Binder 概述

以下简化图显示了 RabbitMQ binder 的运行方式:

图 1_.RabbitMQ Binder

默认情况下,RabbitMQ Binder implementation maps 将每个目标映射到TopicExchange。对于每个 consumer group,Queue绑定到TopicExchange。每个 consumer 实例的 group 的Queue都有一个对应的 RabbitMQ Consumer实例。对于分区生成器和使用者,队列以分区索引为后缀,并使用分区索引作为路由 key。对于匿名使用者(没有group property 的用户),使用 auto-delete 队列(具有随机的唯一 name)。

通过使用可选的autoBindDlq选项,您可以配置 binder 以创建和配置 dead-letter 队列(DLQ)(以及 dead-letter 交换DLX,以及路由基础结构)。默认情况下,死信队列具有目标的 name,并附加.dlq。如果启用了重试(maxAttempts > 1),则在重试耗尽后,失败的消息将传递到 DLQ。如果禁用重试(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便将失败的消息路由到 DLQ,而不是 re-queued。此外,republishToDlq会导致 binder 将失败的消息发布到 DLQ(而不是拒绝它)。此 feature 允许将其他信息(例如x-exception-stacktrace标题中的堆栈跟踪)添加到 headers 中的消息中。此选项不需要重试。只需一次尝试,您就可以重新发布失败的消息。从 version 1.2 开始,您可以配置重新发布的消息的传递模式。见republishDeliveryMode property

requeueRejected设置为true(使用republishToDlq=false)会导致消息 re-queued 并连续重新传递,这可能不是您想要的,除非失败的原因是暂时的。通常,您应该通过将maxAttempts设置为大于 1 或将republishToDlq设置为true来启用 binder 中的重试。

有关这些 properties 的更多信息,请参见第 39.3.1 节,“RabbitMQ Binder Properties”

framework 没有提供消耗 dead-letter 消息的任何标准机制(或者 re-route 消息返回主队列)。一些选项在第 39.6 节,“Dead-Letter 队列处理”中描述。

当在 Spring Cloud Stream application 中使用多个 RabbitMQ binders 时,禁用'RabbitAutoConfiguration'以避免从RabbitAutoConfiguration应用于两个 binders 的相同配置非常重要。您可以使用@SpringBootApplication annotation 排除 class。

从 version 2.0 开始,RabbitMessageChannelBinder _set 将RabbitTemplate.userPublisherConnection property 设置为true,以便 non-transactional 生成器避免消费者死锁,如果缓存连接因 broker 上的memory 警报而被阻止,则可能发生死锁。

39.3 Configuration 选项

此部分包含特定于 RabbitMQ Binder 和绑定 channels 的设置。

有关常规 binding configuration 选项和 properties,请参阅Spring Cloud Stream 核心文档

39.3.1 RabbitMQ Binder Properties

默认情况下,RabbitMQ binder 使用 Spring Boot 的ConnectionFactory。因此,它支持 RabbitMQ 的所有 Spring Boot configuration 选项。 (对于 reference,请参阅Spring Boot 文档)。 RabbitMQ configuration 选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ binder 还支持以下 properties:

默认值:空。

默认值:空。

默认值:1(BESTLEVEL)。

默认值:none(Spring AMQP 默认值)。

39.3.2 RabbitMQ Consumer Properties

以下 properties 仅适用于 Rabbit 使用者,必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.为前缀。

默认值:AUTO

默认值:false

默认值:#

默认值:true

默认值:prefix+destination.dlq

默认值:'前缀 DLX'

默认值:destination

默认值:true

默认值:false

默认值:none

默认值:none

默认值:no expiration

默认值:false

默认值:no limit

默认值:no limit

默认值:none

默认值:no limit

默认值:true

默认值:true

默认值:true

默认值:topic

默认值:false

默认值:no expiration

默认值:5000

默认值:['*'](所有 headers)。

默认值:false

默认值:1

默认值:no limit

默认值:no limit

默认值:none

默认值:false

默认值:1

默认值:“”。

默认值:3

默认值:false。

默认值:5000

默认值:false

默认值:DeliveryMode.PERSISTENT

默认值:false

默认值:false

默认值:no limit

默认值:1

39.3.3 Rabbit Producer Properties

以下 properties 仅适用于 Rabbit 生成器,必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.为前缀。

默认值:false

默认值:false

默认值:100

默认值:10000

默认值:5000

默认值:#

默认值:true

默认值:false

默认值:prefix+destination.dlq

默认值:'前缀 DLX'

默认值:destination

默认值:true

默认值:未设置x-delay标头。

默认值:false

默认值:PERSISTENT

默认值:none

默认值:none

默认值:no expiration

默认值:no limit

默认值:no limit

默认值:none

默认值:no limit

默认值:true

默认值:true

默认值:topic

默认值:no expiration

默认值:['*'](所有 headers)。

默认值:false

默认值:no limit

默认值:no limit

默认值:none

默认值:“”。

默认值:false。

默认值:destinationdestination-<partition>表示分区目的地。

默认值:false

默认值:no limit

在 RabbitMQ 的情况下,content type headers 可以由外部 applications 设置。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,用于任何类型的传输 - 包括传输,例如 Kafka(在 0.11 之前),本身不支持 headers。

39.4 使用 RabbitMQ Binder 重试

在 binder 中启用重试时,listener 容器线程将暂停以配置任何后退时段。当单个 consumer 需要严格的 ordering 时,这可能很重要。但是,对于其他用例,它会阻止在该线程上处理其他消息。使用 binder 重试的另一种方法是使用 time 设置死字法以生活在 dead-letter 队列(DLQ)上以及 DLQ 本身上的 dead-letter configuration。有关此处讨论的 properties 的更多信息,请参阅“第 39.3.1 节,“RabbitMQ Binder Properties””。您可以使用以下 example configuration 来启用此 feature:

要强制消息为 dead-lettered,要么抛出AmqpRejectAndDontRequeueException或将requeueRejected设置为true(默认值)并抛出任何 exception。

循环继续没有结束,这对于瞬态问题很好,但是你可能想在经过一些尝试后放弃。幸运的是,RabbitMQ 提供了x-death标头,可以让您确定发生了多少个循环。

要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException

39.4.1 全部放在一起

以下 configuration 创建一个 exchange myDestination,其队列myDestination.consumerGroup绑定到带有通配符路由 key #的 topic 交换:

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此 configuration 创建一个 DLQ 绑定到直接交换(DLX),路由 key 为myDestination.consumerGroup。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,消息过期并使用 queue name 作为路由 key 路由到原始队列,如以下 example 所示:

Spring Boot application.

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-death标头中的 count property 是Long

39.5 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且还可以配置为将异步 producer 发送失败发送到错误 channel。有关详细信息,请参阅“???”。

RabbitMQ 有两种类型的发送失败:

后者很少见。根据 RabbitMQ 文档“只有在负责队列的 Erlang process 中发生内部错误时才会传递”[258]。“

除了启用 producer 错误 channels(如“???”中所述)之外,如果连接工厂配置正确,RabbitMQ binder 仅向 channel 发送消息,如下所示:

将 Spring Boot configuration 用于连接工厂时,请设置以下 properties:

返回消息的ErrorMessage的有效负载是带有以下 properties 的ReturnedAmqpMessageException

对于否定确认的确认,有效负载是NackedAmqpMessageException,具有以下 properties:

没有自动处理这些 exceptions(例如发送到dead-letter 队列)。您可以使用自己的 Spring Integration 流使用这些 exceptions。

39.6 Dead-Letter 队列处理

因为您无法预测用户将如何处理 dead-lettered 消息,所以 framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始队列。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下 Spring Boot application 显示了如何将这些消息重新路由回原始队列的示例,但在三次尝试后将它们移动到第三个“停车场”队列。第二个 example 使用RabbitMQ 延迟消息交换为 re-queued 消息引入延迟。在这个例子中,每次尝试的延迟都会增加。这些示例使用@RabbitListener来接收来自 DLQ 的消息。您还可以在批处理 process 中使用RabbitTemplate.receive()

这些示例假设原始目标是so8400in,consumer group 是so8400

39.6.1 Non-Partitioned 目的地

前两个示例是针对目标未**分区的时间:

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

39.6.2 分区目的地

对于分区目标,所有分区都有一个 DLQ。我们从 headers 确定原始队列。

republishToDlq=false

republishToDlqfalse时,RabbitMQ 将消息发布到 DLX/DLQ,并带有包含有关原始目标的信息的x-death标头,如下面的示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

republishToDlqtrue时,重新发布恢复器会将原始交换和路由 key 添加到 headers,如下面的示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

39.7 使用 RabbitMQ Binder 进行分区

RabbitMQ 本身不支持分区。

有时,将数据发送到特定分区是有利的 - 例如,当您要严格 order 消息处理时,特定客户的所有消息都应该转到同一分区。

RabbitMessageChannelBinder通过将每个分区的队列绑定到目标交换机来提供分区。

以下 Java 和 YAML 示例显示了如何配置 producer:

监制.

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前面 example 中的 configuration 使用默认分区(key.hashCode() % partitionCount)。根据 key 值,这可能会也可能不会提供适当平衡的算法。您可以使用partitionSelectorExpressionpartitionSelectorClass properties 覆盖此默认值。

只有在部署 producer 时需要配置 consumer 队列时,才需要required-groups property。否则,在部署相应的 consumer 之前,发送到分区的任何消息都将丢失。

以下 configuration 规定 topic 交换:

以下队列绑定到该交换:

以下绑定将队列关联到交换:

以下 Java 和 YAML 示例继续前面的示例并演示如何配置 consumer:

消费者.

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

RabbitMessageChannelBinder不支持动态缩放。每个分区必须至少有一个 consumer。 consumer 的instanceIndex用于指示使用哪个分区。像 Cloud Foundry 这样的平台只能有一个带有instanceIndex的实例。

首页