27. Binder

Spring Cloud Stream 提供 Binder 抽象,用于连接外部中间件的物理目标。本节提供有关 Binder SPI 背后的主要概念,其主要组件和 implementation-specific 详细信息的信息。

27.1 生产者和消费者

图 1_.生产者和消费者

producer 是将消息发送到 channel 的任何 component。 channel 可以通过 Binder 的 Binder implementation 绑定到外部消息 broker。调用bindProducer()方法时,第一个参数是 broker 中目标的 name,第二个参数是 producer 将向其发送消息的本地 channel 实例,第三个参数包含 properties(例如分区 key 表达式)在为该 channel 创建的适配器中使用。

consumer 是从 channel 接收消息的任何 component。与 producer 一样,consumer 的 channel 可以绑定到外部消息 broker。调用bindConsumer()方法时,第一个参数是目标 name,第二个参数提供逻辑消费者组的 name。由消费绑定对于给定的目的地表示的每个 group 接收一个 producer 发送给该目的地(i.e.,publish-subscribe 语义)的每个消息的副本。如果有多个 consumer 实例使用相同的 group name 绑定,则消息将在 consumer 实例之间 load-balanced,以便 producer 发送的每条消息仅被每个 group 中的一个 consumer 实例使用(i.e.,排队语义)。

27.2 Binder SPI

Binder SPI 由许多接口,out-of-the box 实用程序 classes 和发现策略组成,它们为连接到外部中间件提供了可插拔机制。

SPI 的 key 点是Binder接口,它是将输入和输出连接到外部中间件的策略。

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
	Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

	Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

接口已参数化,提供了许多扩展点:

典型的 binder implementation 包含以下内容

kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

27.3 Binder 检测

Spring Cloud Stream 依赖于 Binder SPI 的 implementations 来执行连接 channels 到消息代理的任务。每个 Binder implementation 通常连接到一种类型的消息传递系统。

27.3.1 Classpath Detection

默认情况下,Spring Cloud Stream 依赖于 Spring Boot 的 auto-configuration 来配置 binding process。如果在 classpath 上找到一个 Binder implementation,Spring Cloud Stream 将自动使用它。对于 example,旨在仅绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以简单地添加以下依赖项:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他 binder 依赖项的特定 maven 坐标,请参阅该 binder implementation 的文档。

27.4 Classpath 上有多个 Binder

当 classpath 中存在多个 binders 时,application 必须指示每个 channel binding 使用哪个 binder。每个 binder configuration 都包含一个META-INF/spring.binders,这是一个简单的 properties 文件:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

类似 files 为其他提供 binder implementations(e.g,Kafka)存在,和自定义 binder implementations 预计将提供他们,以及。该 key 表示用于 binder implementation 的识别 name,而 value 是 configuration 的 comma-separated 列表 classes 各自含有一个且仅一个类型的org.springframework.cloud.stream.binder.Binder bean 定义。

Binder 选择可以在全球范围内使用spring.cloud.stream.defaultBinder property 通过在每个 channel binding 配置 binder 进行,(e.g. ,spring.cloud.stream.defaultBinder=rabbit)或单独地。例如,处理器 application(具有 channels,其名称分别为inputoutput,分别为 read/write)从 Kafka 读取并写入 RabbitMQ 可以指定以下 configuration:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

27.5 连接到多个系统

默认情况下,binders 共享 application 的 Spring Boot auto-configuration,以便在 classpath 中找到每个 binder 的一个实例。如果您的 application 应该连接到多个相同类型的 broker,您可以指定多个 binder 配置,每个配置具有不同的环境设置。

打开显式 binder configuration 将完全禁用默认的 binder configuration process。如果这样做,则所有正在使用的 binders 都必须包含在 configuration 中。打算透明地使用 Spring Cloud Stream 的框架可能会创建可由 name 引用的 binder 配置,但不会影响默认的 binder configuration。在_这样做时,binder configuration 可能将defaultCandidate flag 设置为 false,e.g. spring.cloud.stream.binders.<configurationName>.defaultCandidate=false。这表示 configuration 将独立于默认 binder configuration process 而存在。

例如,这是连接到两个 RabbitMQ broker 实例的处理器 application 的典型 configuration:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: foo
          binder: rabbit1
        output:
          destination: bar
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>

27.6 Binder configuration properties

创建自定义 binder 配置时,可以使用以下 properties。它们必须以spring.cloud.stream.binders.<configurationName>为前缀。

默认情况下,它与 configuration name 具有相同的 value。

默认true

默认empty

默认true

首页