30. Binders

Spring Cloud Stream 提供了一个 Binder 抽象,可用于连接到外部中间件上的物理目标。本节提供有关 Binder SPI 背后的主要概念,其主要组件以及特定于实现的详细信息。

30.1 生产者和 Consumer

下图显示了生产者和 Consumer 的一般关系:

图 30.1. 生产者和 Consumer

producers consumers

生产者是将消息发送到通道的任何组件。可以将通道绑定到具有该代理的Binder实现的外部消息代理。调用bindProducer()方法时,第一个参数是代理内目标的名称,第二个参数是生产者向其发送消息的本地通道实例,第三个参数包含要作为对象的属性(例如分区键表达式)在为该通道创建的适配器内使用。

使用者是从通道接收消息的任何组件。与生产者一样,Consumer 的 Channel 可以绑定到外部消息代理。调用bindConsumer()方法时,第一个参数是目标名称,第二个参数提供逻辑 Consumer 组的名称。由给定目标的使用者绑定表示的每个组都接收生产者发送到该目标的每个消息的副本(也就是说,它遵循常规的发布-订阅语义)。如果有多个使用相同组名绑定的使用者实例,那么消息将在这些使用者实例之间进行负载平衡,以便生产者发送的每条消息仅在每个组中的单个使用者实例中被使用(也就是说,遵循正常的排队语义)。

30.2 粘合剂 SPI

Binder SPI 由许多接口,现成的 Util 类和发现策略组成,这些策略提供了用于连接到外部中间件的可插拔机制。

SPI 的关键是Binder接口,这是一种将 Importing 和输出连接到外部中间件的策略。以下清单显示了Binder接口的定义:

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

该接口已参数化,提供了许多扩展点:

  • Importing 和输出绑定目标。从 1.0 版开始,仅支持MessageChannel,但将来打算将其用作扩展点。

  • 扩展的使用者和生产者属性,允许特定的 Binder 实现添加可以以类型安全的方式支持的补充属性。

典型的 Binder 实现包括以下内容:

  • 实现Binder接口的类;

  • Spring @Configuration类与中间件连接基础结构一起创建类型为Binder的 bean。

  • 在 Classpath 上找到的META-INF/spring.binders文件,其中包含一个或多个 Binder 定义,如以下示例所示:

kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

30.3Binder 检测

Spring Cloud Stream 依赖于 Binder SPI 的实现来执行将通道连接到消息代理的任务。每个 Binder 实现通常都连接到一种消息传递系统。

30.3.1Classpath 检测

默认情况下,Spring Cloud Stream 依靠 Spring Boot 的自动配置来配置绑定过程。如果在 Classpath 上找到单个 Binder 实现,Spring Cloud Stream 将自动使用它。例如,旨在仅绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他 Binder 依赖项的特定 Maven 坐标,请参阅该 Binder 实现的文档。

30.4Classpath 上的多个绑定器

当 Classpath 上存在多个绑定程序时,应用程序必须指示将哪个绑定程序用于每个通道绑定。每个 Binder 配置都包含一个META-INF/spring.binders文件,这是一个简单的属性文件,如以下示例所示:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

其他提供的 Binder 实现(例如 Kafka)也存在类似的文件,并且预期的自定义 Binder 实现也将提供它们。关键字表示绑定程序实现的标识名,而该值是用逗号分隔的配置类列表,每个配置类都包含一个且仅一个org.springframework.cloud.stream.binder.Binder类型的 bean 定义。

绑定程序选择可以使用spring.cloud.stream.defaultBinder属性(例如spring.cloud.stream.defaultBinder=rabbit)全局执行,也可以通过在每个通道绑定上配置绑定程序单独进行。例如,从 Kafka 读取并写入 RabbitMQ 的处理器应用程序(具有分别名为inputoutput的通道用于读取和写入)可以指定以下配置:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

30.5 连接到多个系统

默认情况下,Binder 共享应用程序的 Spring Boot 自动配置,以便创建在 Classpath 上找到的每个 Binder 的一个实例。如果您的应用程序应连接到多个同一类型的代理,则可以指定多个绑定程序配置,每个配置具有不同的环境设置。

Note

打开显式 Binder 配置会完全禁用默认的 Binder 配置过程。如果这样做,则配置中必须包括所有正在使用的 Binder。打算透明地使用 Spring Cloud Stream 的框架可以创建可按名称引用的绑定器配置,但它们不会影响默认的绑定器配置。为此,Binder 配置可以将其defaultCandidate标志设置为 false(例如spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。这表示独立于默认绑定程序配置过程而存在的配置。

以下示例显示了连接到两个 RabbitMQ 代理实例的处理器应用程序的典型配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>

30.6 绑定可视化和控件

从 2.0 版开始,Spring Cloud Stream 支持通过 Actuator 端点进行绑定的可视化和控制。

从版本 2.0Actuator 和 Web 开始是可选的,您必须首先添加 Web 依赖项之一,然后手动添加 Actuator 依赖项。下面的示例演示如何为 Web 框架添加依赖项:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

下面的示例演示如何为 WebFlux 框架添加依赖项:

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以如下添加 Actuator 依赖项:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Note

要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,必须将spring-boot-starter-webspring-boot-starter-actuator添加到 Classpath 中。否则,由于运行状况检查失败,该应用程序将无法启动。

您还必须通过设置以下属性来启用bindingsActuator 端点:--management.endpoints.web.exposure.include=bindings

一旦满足这些先决条件。应用程序启动时,您应该在日志中看到以下内容:

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要显示当前绑定,请访问以下 URL:http://<host>:<port>/actuator/bindings

或者,要查看单个绑定,请访问类似于以下内容的 URL 之一:http://<host>:<port>/actuator/bindings/myBindingName

您还可以通过发布到同一 URL 并提供state参数作为 JSON 来停止,开始,暂停和恢复单个绑定,如以下示例所示:

curl -d' {"state":"STOPPED"}'-H“Content Type:application/json” -X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d' {"state":"STARTED"}'-H“Content Type:application/json” -X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d' {"state":"PAUSED"} '-H“Content Type:应用程序/ json” -X POST http://<host>:<port>/actuator/bindings/myBindingName curl -d' {"state":"RESUMED"}'-H“Content Type:应用程序/ json” -X POST http://<host>:<port>/actuator/bindings/myBindingName

Note

PAUSEDRESUMED仅在相应的 Binder 及其基础技术支持时起作用。否则,您会在日志中看到警告消息。当前,仅 KafkaBinder 支持PAUSEDRESUMED状态。

30.7Binder 配置属性

定制 Binder 配置时,以下属性可用。这些属性通过org.springframework.cloud.stream.config.BinderProperties公开

它们必须以spring.cloud.stream.binders.<configurationName>作为前缀。

  • type

    • 资料夹类型。它通常引用在 Classpath 中找到的绑定器之一,尤其是META-INF/spring.binders文件中的键。

默认情况下,它具有与配置名称相同的值。

  • inheritEnvironment

    • 配置是否继承应用程序本身的环境。

默认值:true

  • environment

    • 根可用于定制 Binder 环境的一组属性。设置此属性后,在其中创建 Binder 的上下文不是应用程序上下文的子级。该设置允许在粘合剂组分和应用组分之间完全分离。

默认值:empty

  • defaultCandidate

    • Binder 配置是被视为默认 Binder 的候选者,还是仅在明确引用时才可以使用。此设置允许添加 Binder 配置,而不会干扰默认处理。

默认值:true