28. 主要概念

Spring Cloud Stream 提供了许多抽象和 Primitives,简化了消息驱动的微服务应用程序的编写。本节概述了以下内容:

28.1 应用程序模型

Spring Cloud Stream 应用程序由与中间件无关的核心组成。该应用程序通过 Spring Cloud Stream 注入到外界的 Importing 和输出通道与外界进行通信。通道通过特定于中间件的 Binder 实现连接到外部代理。

图 28.1 Spring Cloud Stream 应用程序

带粘合剂的 SCSt

28.1.1 Fat JAR

可以从您的 IDE 以独立模式运行 Spring Cloud Stream 应用程序以进行测试。要在 Producing 运行 Spring Cloud Stream 应用程序,可以使用为 Maven 或 Gradle 提供的标准 Spring Boot 工具来创建可执行(或“胖”)JAR。有关更多详细信息,请参见Spring Boot 参考指南

28.2Binder 抽象

Spring Cloud Stream 为KafkaRabbit MQ提供了 Binder 实现。 Spring Cloud Stream 还包含一个TestSupportBinder,它使通道保持不变,因此测试可以直接与通道交互,并可靠地 assert 所接收的内容。您还可以使用可扩展的 API 编写自己的 Binder。

Spring Cloud Stream 使用 Spring Boot 进行配置,而 Binder 抽象使 Spring Cloud Stream 应用程序可以灵活地连接到中间件。例如,部署者可以在运行时动态选择通道连接到的目的地(例如 Kafka 主题或 RabbitMQ 交换)。可以通过外部配置属性以及 Spring Boot 支持的任何形式(包括应用程序参数,环境变量以及application.ymlapplication.properties文件)提供此类配置。在第 27 章,Spring Cloud Stream 简介部分的接收器示例中,将spring.cloud.stream.bindings.input.destination应用程序属性设置为raw-sensor-data会使它从raw-sensor-data Kafka 主题或绑定到raw-sensor-data RabbitMQ 交换的队列中读取。

Spring Cloud Stream 自动检测并使用在 Classpath 上找到的绑定器。您可以使用具有相同代码的不同类型的中间件。为此,在构建时包括一个不同的 Binder。对于更复杂的用例,您还可以在应用程序中打包多个 Binder,并在运行时选择 Binder(甚至是否为不同的通道使用不同的 Binder)。

28.3 持久的发布-订阅支持

应用程序之间的通信遵循发布-订阅模型,其中数据通过共享主题进行 Broadcast。在下图中可以看到,该图显示了一组交互的 Spring Cloud Stream 应用程序的典型部署。

图 28.2 Spring Cloud Stream 发布-订阅

SCSt sensors

传感器报告给 HTTP 端点的数据将发送到名为raw-sensor-data的公共目标。从目的地开始,它由计算时间窗平均值的微服务应用程序和另一个将原始数据摄取到 HDFS(Hadoop 分布式文件系统)的微服务应用程序独立处理。为了处理数据,两个应用程序都在运行时将主题声明为其 Importing。

发布-订阅通信模型降低了生产者和使用者的复杂性,并允许在不中断现有流程的情况下将新应用程序添加到拓扑中。例如,在平均计算应用程序的下游,您可以添加一个计算最高温度值以进行显示和监视的应用程序。然后,您可以添加另一个解释相同平均值流以进行故障检测的应用程序。通过共享主题而不是点对点队列进行所有通信可以减少微服务之间的耦合。

尽管发布-订阅消息传递的概念并不新鲜,但是 Spring Cloud Stream 采取了额外的步骤,使其成为其应用程序模型的明智选择。通过使用本机中间件支持,Spring Cloud Stream 还简化了跨不同平台的发布-订阅模型的使用。

28.4 消费群体

尽管发布-订阅模型使通过共享主题轻松连接应用程序变得很重要,但是通过创建给定应用程序的多个实例进行扩展的能力同样重要。这样做时,会将应用程序的不同实例置于竞争的 Consumer 关系中,在该 Consumer 关系中,只有一个实例可以处理给定消息。

Spring Cloud Stream 通过 Consumer 群体的概念对这种行为进行建模。 (Spring Cloud Stream 使用者组与 Kafka 使用者组相似,并受其启发.)每个使用者绑定都可以使用spring.cloud.stream.bindings.<channelName>.group属性来指定组名。对于下图所示的使用者,此属性将设置为spring.cloud.stream.bindings.<channelName>.group=hdfsWritespring.cloud.stream.bindings.<channelName>.group=average

图 28.3. Spring Cloud Stream 消费群

SCSt groups

订阅给定目标的所有组都将收到已发布数据的副本,但是每个组中只有一个成员从该目标接收给定消息。默认情况下,当未指定组时,Spring Cloud Stream 会将应用程序分配给一个匿名且独立的单成员使用者组,该使用者组与所有其他使用者组具有发布-订阅关系。

28.5Consumer 类型

支持两种类型的使用者:

  • 消息驱动(有时称为异步)

  • 轮询(有时称为同步)

在 2.0 版之前,仅支持异步使用者。消息一旦可用,就会被传递,并且有线程可以处理它。

当您希望控制消息的处理速率时,可能需要使用同步使用者。

28.5.1 Durability

与 Spring Cloud Stream 公认的应用程序模型一致,Consumer 组订阅是持久的。也就是说,Binder 实现可确保组订阅是持久的,并且一旦为组创建了至少一个订阅,该组就可以接收消息,即使消息是在组中所有应用程序停止时发送的也是如此。

Note

匿名订阅本质上是非持久的。对于某些 Binder 实现(例如 RabbitMQ),可能具有非持久的组订阅。

通常,在将应用程序绑定到给定目标时,最好始终指定使用者组。扩展 Spring Cloud Stream 应用程序时,必须为其每个 Importing 绑定指定使用者组。这样做可以防止应用程序的实例接收重复的消息(除非需要这种行为,这是不寻常的)。

28.6 分区支持

Spring Cloud Stream 支持在给定应用程序的多个实例之间对数据进行分区。在分区方案中,物理通信介质(例如代理主题)被视为结构化为多个分区。一个或多个生产者应用程序实例将数据发送到多个 Consumer 应用程序实例,并确保由共同 Feature 标识的数据由同一 Consumer 实例处理。

Spring Cloud Stream 提供了一种通用抽象,用于以统一的方式实现分区处理用例。因此,无论代理本身是自然分区(例如,Kafka)还是非自然分区(例如,RabbitMQ),都可以使用分区。

图 28.4. Spring Cloud Stream 分区

SCSt partitioning

分区是有状态处理中的关键概念,对于确保所有相关数据一起处理而言,分区是至关重要的(出于性能或一致性方面的考虑)。例如,在带时间窗的平均计算示例中,重要的是,来自任何给定传感器的所有测量都应由同一应用实例处理。

Note

要设置分区处理方案,必须同时配置数据产生端和数据消耗端。