41. RabbitMQ Binder

41.1 Usage

要使用 RabbitMQ 绑定器,可以通过使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,您可以使用 Spring Cloud Stream RabbitMQ Starter,如下所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

41.2 RabbitMQBinder 概述

以下简化图显示了 RabbitMQ 绑定程序的工作方式:

图 41.1. RabbitMQBinder

默认情况下,RabbitMQ Binder 实现将每个目标 Map 到TopicExchange。对于每个 Consumer 组,Queue都绑定到该TopicExchange。每个使用者实例都有对应于其组Queue的 RabbitMQ Consumer实例。对于分区的生产者和使用者,队列以分区索引为后缀,并使用分区索引作为路由键。对于匿名使用者(没有group属性的使用者),将使用自动删除队列(具有随机的唯一名称)。

通过使用可选的autoBindDlq选项,您可以配置 Binder 以创建和配置死信队列(DLQ)(以及死信交换机DLX以及路由基础结构)。默认情况下,死信队列具有目标名称,后跟.dlq。如果启用了重试(maxAttempts > 1),则在重试用尽后,失败的消息将传递到 DLQ。如果禁用了重试(maxAttempts = 1),则应将requeueRejected设置为false(默认值),以便将失败的消息路由到 DLQ,而不是重新排队。另外,republishToDlq使绑定程序将失败的消息发布到 DLQ(而不是拒绝它)。通过此功能,可以将其他信息(例如x-exception-stacktrace头中的堆栈跟踪)添加到头中的消息中。此选项不需要启用重试。只需尝试一次,您就可以重新发布失败的消息。从 1.2 版开始,您可以配置重新发布邮件的传递模式。参见republishDeliveryMode property

Tip

requeueRejected设置为true(使用republishToDlq=false)会导致消息被重新排队并连续重新发送,除非失败的原因是短暂的,否则这可能不是您想要的。通常,应通过将maxAttempts设置为大于 1 或将republishToDlq设置为true来在 Binder 中启用重试。

有关这些属性的更多信息,请参见第 41.3.1 节“ RabbitMQBinder 属性”

该框架没有提供任何标准机制来使用死信消息(或将其重新路由回主队列)。 第 41.6 节“死信队列处理”中描述了一些选项。

Note

当在 Spring Cloud Stream 应用程序中使用多个 RabbitMQ 绑定程序时,重要的是禁用“ RabbitAutoConfiguration”以避免将来自RabbitAutoConfiguration的相同配置应用于两个绑定程序。您可以使用@SpringBootApplicationComments 排除该类。

从 2.0 版开始,RabbitMessageChannelBinderRabbitTemplate.userPublisherConnection属性设置为true,以便非事务生产方避免对使用方造成死锁,如果由于代理上的memory alarm而阻止了缓存的连接,则可能会发生死锁。

Note

当前,仅multiplex使用者(一个使用者正在收听多个队列)仅受消息驱动的使用者支持;被轮询的使用者只能从单个队列中检索消息。

41.3 配置选项

本节包含特定于 RabbitMQBinder 和绑定通道的设置。

有关常规绑定配置选项和属性,请参见Spring Cloud Stream 核心文档

41.3.1 RabbitMQBinder 属性

默认情况下,RabbitMQ 绑定器使用 Spring Boot 的ConnectionFactory。一致地,它支持 RabbitMQ 的所有 Spring Boot 配置选项。 (有关参考,请参见Spring Boot 文档)。 RabbitMQ 配置选项使用spring.rabbitmq前缀。

除了 Spring Boot 选项之外,RabbitMQ 绑定器还支持以下属性:

Default: empty.

Default: empty.

默认值:1(BEST_LEVEL)。

默认值:无(Spring AMQP 默认值)。

41.3.2 RabbitMQConsumer 属性

以下属性仅适用于 RabbitConsumer,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.consumer.作为前缀。

默认值:AUTO

默认值:false

默认值:#

默认值:true

默认值:无-broker 将生成随机的 Consumer 标签。

默认值:prefix+destination.dlq

Default: 'prefix+DLX'

Default: 'direct'

默认值:destination

默认值:true

默认值:true

默认值:false

默认值:none

默认值:none

默认值:no expiration

默认值:false

默认值:no limit

默认值:no limit

默认值:none

默认值:none

默认值:no limit

默认值:true

默认值:true

默认值:true

默认值:topic

默认值:false

默认值:no expiration

Default: 5000

默认值:['*'](所有 Headers)。

默认值:false

默认值:1

默认值:no limit

默认值:no limit

默认值:none

默认值:false

默认值:none

默认值:1

Default: "".

默认值:3

Default: false.

默认值:5000

默认值:false

默认值:DeliveryMode.PERSISTENT

Default: false

默认值:false

默认值:no limit

默认值:1

41.3.3 高级侦听器容器配置

要设置未公开为绑定程序或绑定属性的侦听器容器属性,请向应用程序上下文中添加类型为ListenerContainerCustomizer的单个 bean。将设置 Binder 和绑定属性,然后将调用定制程序。定制程序(configure()方法)具有队列名称以及使用者组作为参数。

41.3.4 Rabbit Producer 属性

以下属性仅适用于 Rabbit 生产者,并且必须以spring.cloud.stream.rabbit.bindings.<channelName>.producer.作为前缀。

默认值:false

默认值:false

默认值:100

默认值:10000

默认值:5000

默认值:#

默认值:true

默认值:false

默认值:prefix+destination.dlq

Default: 'prefix+DLX'

Default: 'direct'

默认值:destination

默认值:true

默认值:true

默认值:未设置x-delayHeaders。

默认值:false

默认值:PERSISTENT

默认值:none

默认值:none

默认值:no expiration

默认值:no limit

默认值:no limit

默认值:none

默认值:no limit

默认值:true

默认值:true

默认值:topic

默认值:no expiration

默认值:['*'](所有 Headers)。

默认值:false

默认值:no limit

默认值:no limit

默认值:none

Default: "".

Default: false.

默认值:destinationdestination-<partition>(用于分区目标)。

默认值:false

默认值:no limit

Note

对于 RabbitMQ,可以由外部应用程序设置 Content TypeHeaders。 Spring Cloud Stream 支持它们作为扩展内部协议的一部分,该协议用于任何类型的传输,包括不支持 Headers 的传输,例如 Kafka(0.11 之前的版本)。

41.4 使用 RabbitMQBinder 重试

当在 Binder 中启用重试时,侦听器容器线程将在配置的任何退避期间暂停。当需要单个 Consumer 进行严格 Order 时,这可能很重要。但是,对于其他用例,它阻止在该线程上处理其他消息。使用 Binder 重试的另一种方法是设置带有时间的无效字母,以保留在无效字母队列(DLQ)中,并在 DLQ 本身上进行无效字母配置。有关此处讨论的属性的更多信息,请参见“ 第 41.3.1 节“ RabbitMQBinder 属性””。您可以使用以下示例配置来启用此功能:

要强制对消息进行死信处理,请抛出AmqpRejectAndDontRequeueException或将requeueRejected设置为true(默认值)并抛出任何异常。

循环无休止地 continue 进行,这对于瞬态问题很好,但是您可能需要在尝试几次后放弃。幸运的是,RabbitMQ 提供了x-deathHeaders,可让您确定发生了多少个循环。

要在放弃后确认消息,请抛出ImmediateAcknowledgeAmqpException

41.4.1 放在一起

以下配置创建一个具有队列myDestination.consumerGroup的交换myDestination,队列myDestination.consumerGroup绑定到具有通配符路由键#的主题交换:

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

此配置创建绑定到使用路由密钥myDestination.consumerGroup的直接交换机(DLX)的 DLQ。当邮件被拒绝时,它们将被路由到 DLQ。 5 秒后,该消息到期,并通过使用队列名称作为路由键将其路由到原始队列,如以下示例所示:

Spring Boot 应用程序.

@SpringBootApplication
@EnableBinding(Sink.class)
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in, @Header(name = "x-death", required = false) Map<?,?> death) {
        if (death != null && death.get("count").equals(3L)) {
            // giving up - don't send to DLX
            throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
        }
        throw new AmqpRejectAndDontRequeueException("failed");
    }

}

请注意,x-deathHeaders 中的 count 属性是Long

41.5 错误通道

从版本 1.3 开始,绑定程序无条件地将异常发送到每个使用者目的地的错误通道,也可以将其配置为将异步生产者发送失败消息发送到错误通道。有关更多信息,请参见“ ???”。

RabbitMQ 有两种类型的发送失败:

后者很少见。根据 RabbitMQ 文档,“ [仅在负责队列的 Erlang 进程中发生内部错误时,才会传递[nack]”。

除了启用生产者错误通道(如“ ???”中所述)外,RabbitMQ 绑定器仅在正确配置了连接工厂的情况下才将消息发送到通道,如下所示:

将 Spring Boot 配置用于连接工厂时,请设置以下属性:

返回消息的ErrorMessage的有效负载是具有以下属性的ReturnedAmqpMessageException

对于否定确认的确认,有效负载是具有以下属性的NackedAmqpMessageException

无法自动处理这些异常(例如发送到dead-letter queue)。您可以通过自己的 Spring Integration 流使用这些异常。

41.6 死信队列处理

因为您无法预期用户将如何处置死信,所以该框架没有提供任何标准机制来处理它们。如果死信的原因是短暂的,则您可能希望将消息路由回原始队列。但是,如果问题是永久性问题,则可能导致无限循环。下面的 Spring Boot 应用程序显示了一个示例,该示例演示如何将这些消息路由回原始队列,但在尝试三次后将其移至第三个“停车场”队列。第二个示例使用RabbitMQ 延迟消息交换为重新排队的消息引入延迟。在此示例中,每次尝试的延迟都会增加。这些示例使用@RabbitListener接收来自 DLQ 的消息。您也可以在批处理过程中使用RabbitTemplate.receive()

这些示例假定原始目的地为so8400in,而使用者组为so8400

41.6.1 非分区目标

前两个示例适用于目标未分区的情况:

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

41.6.2 分区目标

对于分区目标,所有分区都有一个 DLQ。我们从标题确定原始队列。

republishToDlq=false

republishToDlqfalse时,RabbitMQ 使用x-deathHeaders 将消息发布到 DLX/DLQ,该 Headers 包含有关原始目的地的信息,如以下示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@SuppressWarnings("unchecked")
	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

republishToDlq=true

republishToDlqtrue时,重新发布的恢复程序会将原始交换和路由密钥添加到 Headers 中,如以下示例所示:

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Hit enter to terminate");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

41.7 使用 RabbitMQBinder 进行分区

RabbitMQ 不支持本地分区。

有时,将数据发送到特定分区是有利的-例如,当您要严格 Order 消息处理时,特定 Client 的所有消息都应转到同一分区。

RabbitMessageChannelBinder通过将每个分区的队列绑定到目标交换机来提供分区。

以下 Java 和 YAML 示例显示了如何配置生产者:

Producer.

@SpringBootApplication
@EnableBinding(Source.class)
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            output:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

Note

前例中的配置使用默认分区(key.hashCode() % partitionCount)。根据键值,这可能会或可能不会提供适当的平衡算法。您可以使用partitionSelectorExpressionpartitionSelectorClass属性覆盖此默认设置。

只有在部署生产方时需要供应方队列时,才需要required-groups属性。否则,发送到分区的所有消息都将丢失,直到部署了相应的使用者为止。

以下配置提供了主题交换:

该交换绑定了以下队列:

以下绑定将队列与交换关联:

以下 Java 和 YAML 示例是前面示例的 continue,并显示了如何配置使用者:

Consumer.

@SpringBootApplication
@EnableBinding(Sink.class)
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
        System.out.println(in + " received from queue " + queue);
    }

}

application.yml.

spring:
      cloud:
        stream:
          bindings:
            input:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0

Tip

RabbitMessageChannelBinder不支持动态缩放。每个分区至少必须有一个使用者。使用者的instanceIndex用于指示使用哪个分区。诸如 Cloud Foundry 之类的平台只能有一个实例带有instanceIndex

首页