28. Binder

Spring Cloud Stream 提供 Binder 抽象,用于连接外部中间件的物理目标。本节提供有关 Binder SPI 背后的主要概念,其主要组件和 implementation-specific 详细信息的信息。

28.1 生产者和消费者

下图显示了生产者和消费者的一般关系:

图 1_.生产者和消费者

生产者消费者

producer 是将消息发送到 channel 的任何 component。 channel 可以绑定到外部消息 broker,并为该 broker 添加Binder implementation。调用bindProducer()方法时,第一个参数是 broker 中目标的 name,第二个参数是 producer 发送消息的本地 channel 实例,第三个参数包含 properties(例如分区 key 表达式)在为该 channel 创建的适配器中使用。

consumer 是从 channel 接收消息的任何 component。与 producer 一样,consumer 的 channel 可以绑定到外部消息 broker。调用bindConsumer()方法时,第一个参数是目标 name,第二个参数提供逻辑消费者组的 name。由给定目标的 consumer 绑定表示的每个 group 都会收到 producer 发送到该目标的每条消息的副本(也就是说,它遵循正常的 publish-subscribe 语义)。如果有多个 consumer 实例绑定了相同的 group name,那么这些 consumer 实例中的消息是 load-balanced,这样 producer 发送的每条消息都只被每个 group 中的一个 consumer 实例使用(也就是说,它遵循正常的排队语义) 。

28.2 Binder SPI

Binder SPI 由许多接口,out-of-the box 实用程序 classes 和发现策略组成,这些策略为连接到外部中间件提供了可插拔机制。

SPI 的 key 点是Binder接口,这是一种将输入和输出连接到外部中间件的策略。以下清单显示了Binder接口的定义:

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

接口已参数化,提供了许多扩展点:

  • 输入和输出绑定目标。从 version 1.0 开始,仅支持MessageChannel,但这将在以后用作扩展点。

  • 扩展的 consumer 和 producer properties,允许特定的 Binder implementations 添加可以 type-safe 方式支持的补充 properties。

典型的 binder implementation 包含以下内容:

  • 一个实现Binder接口的 class;

  • Spring @Configuration class,它与中间件连接基础结构一起创建Binder类型的 bean。

  • 在 classpath 中找到包含一个或多个 binder 定义的META-INF/spring.binders文件,如下面的示例所示:

kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

28.3 Binder 检测

Spring Cloud Stream 依赖于 Binder SPI 的 implementations 来执行连接 channels 到消息代理的任务。每个 Binder implementation 通常连接到一种类型的消息传递系统。

28.3.1 Classpath Detection

默认情况下,Spring Cloud Stream 依赖于 Spring Boot 的 auto-configuration 来配置 binding process。如果在 classpath 上找到单个 Binder implementation,则 Spring Cloud Stream 会自动使用它。对于 example,旨在仅绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他 binder 依赖项的特定 Maven 坐标,请参阅该 binder implementation 的文档。

28.4 Classpath 上有多个 Binder

当 classpath 中存在多个 binders 时,application 必须指示每个 channel binding 使用哪个 binder。每个 binder configuration 都包含一个META-INF/spring.binders文件,这是一个简单的 properties 文件,如下面的示例所示:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

其他提供的 binder implementations(例如 Kafka)也存在类似的 files,并且自定义 binder implementations 也可以提供它们。 key 表示 binder implementation 的标识 name,而 value 是 comma-separated @

Binder 选择可以使用spring.cloud.stream.defaultBinder property(对于 example,spring.cloud.stream.defaultBinder=rabbit)进行全局执行,也可以单独执行,方法是在每个 channel binding 上配置 binder。例如,从 Kafka 读取并写入 RabbitMQ 的处理器应用程序(具有分别名为inputoutput的 channel 用于读取和写入)可以指定以下 configuration:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

28.5 连接到多个系统

默认情况下,binders 共享 application 的 Spring Boot auto-configuration,以便创建 classpath 上找到的每个 binder 的一个实例。如果您的 application 应该连接到多个相同类型的 broker,您可以指定多个 binder 配置,每个配置具有不同的环境设置。

启用显式 binder configuration 会完全禁用默认的 binder configuration process。如果这样做,则所有正在使用的 binder 都必须包含在 configuration 中。打算透明地使用 Spring Cloud Stream 的框架可能会创建可由 name 引用的 binder 配置,但它们不会影响默认的 binder configuration。在_这样做时,binder configuration 可能将defaultCandidate flag 设置为 false(对于 example,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。这表示 configuration 独立于默认 binder configuration process 而存在。

以下 example 显示了连接到两个 RabbitMQ broker 实例的处理器 application 的典型 configuration:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>

28.6 Binding 可视化和控制

由于 version 2.0,Spring Cloud Stream 支持通过 Actuator endpoints 对 Bindings 进行可视化和控制。

从 version 2.0 actuator 和 web 开始是可选的,您必须首先添加一个 web 依赖项,并手动添加 actuator 依赖项。以下 example 显示了如何为 Web framework 添加依赖项:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下 example 显示了如何为 WebFlux framework 添加依赖项:

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以按如下方式添加 Actuator 依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

要在 Cloud Foundry 中 run Spring Cloud Stream 2.0 应用程序,您必须将spring-boot-starter-webspring-boot-starter-actuator添加到 classpath。否则,由于运行状况检查失败,application 将无法启动。

您还必须通过设置以下 property 来启用bindings actuator endpoints:--management.endpoints.web.exposure.include=bindings

一旦满足这些先决条件。 application start 时,您应该在日志中看到以下内容:

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前绑定,请访问以下 URL:http://<host>:<port>/actuator/bindings

另外,要查看单个 binding,请访问其中一个类似于以下内容的 URL:http://<host>:<port>/actuator/bindings/myBindingName

您还可以通过发布到同一 URL 同时提供state参数作为 JSON 来停止,启动,暂停和恢复单个绑定,如以下示例所示:

curl -d'{“ state”:“STOPPED”}'-H“Content-Type:application/json”-X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d'{“ state”:“STARTED”}'-H“Content-Type:application/json”-X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d'{“state “:”PAUSED“}'-H”Content-Type:application/json“-X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d'{” state“:”RESUMED“}'-H”Content-Type:application/json“-X POST http://<host>:<port>/actuator/bindings/myBindingName

PAUSEDRESUMED仅在相应的 binder 及其底层技术支持时才起作用。否则,您会在日志中看到警告消息。目前,只有 Kafka binder 支持PAUSEDRESUMED状态。

28.7 Binder Configuration Properties

自定义 binder 配置时,可以使用以下 properties。这些 properties 通过org.springframework.cloud.stream.config.BinderProperties暴露

它们必须以spring.cloud.stream.binders.<configurationName>为前缀。

  • 类型

    • binder 类型。它通常引用 classpath 中找到的 binders 之一 - 特别是META-INF/spring.binders文件中的 key。

默认情况下,它与 configuration name 具有相同的 value。

  • inheritEnvironment

    • configuration 是否继承了 application 本身的环境。

默认值:true

  • 环境

    • 一组 properties 的根,可用于自定义 binder 的环境。设置此 property 时,正在创建 binder 的 context 不是 application context 的子项。此设置允许 binder 组件和 application 组件之间完全分离。

默认值:empty

  • defaultCandidate

    • binder configuration 是否可以被视为默认 binder,或者只能在显式引用时使用。此设置允许添加 binder 配置,而不会干扰默认处理。

默认值:true