32. Inter-Application 沟通

Spring Cloud Stream 支持 applications 之间的通信。 Inter-application 通信是一个涉及多个问题的复杂问题,如以下主题所述:

32.1 连接多个 Application 实例

虽然 Spring Cloud Stream 使单个 Spring Boot 应用程序可以轻松连接到消息传递系统,但 Spring Cloud Stream 的典型方案是创建 multi-application 管道,其中微服务应用程序相互发送数据。您可以通过关联“相邻”应用程序的输入和输出目标来实现此方案。

假设 Time Source application 的设计 calls 将数据发送到 Log Sink application。您可以在 applications 中使用名为ticktock的 common 目标进行绑定。

Time Source(具有 channel name output)将设置以下 property:

spring.cloud.stream.bindings.output.destination=ticktock

Log Sink(具有 channel name input)将设置以下 property:

spring.cloud.stream.bindings.input.destination=ticktock

32.2 实例索引和实例计数

在放大 Spring Cloud Stream applications 时,每个实例都可以接收有关同一 application 存在多少其他实例以及它自己的实例索引的信息。 Spring Cloud Stream 通过spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex properties 执行此操作。例如,如果有三个 HDFS sink application 实例,则所有三个实例的spring.cloud.stream.instanceCount都设置为3,而各个 applications 的spring.cloud.stream.instanceIndex分别设置为012

当 Spring Cloud Stream applications 通过 Spring Cloud 数据流部署时,这些 properties 会自动配置;当 Spring Cloud Stream applications 独立启动时,必须正确设置这些 properties。默认情况下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0

在 scaled-up 场景中,正确配置这两个 properties 对于解决分区行为(见下文)非常重要,并且 order 中的某些 binders(对于 example,Kafka binder)始终需要两个 properties 以确保数据被拆分正确地跨多个 consumer 实例。

32.3 分区

Spring Cloud Stream 中的分区包含两个任务:

32.3.1 配置分区的输出绑定

您可以通过设置一个且只有一个partitionKeyExpressionpartitionKeyExtractorName properties 以及partitionCount property 来配置输出 binding 以发送分区数据。

例如,以下是有效且典型的 configuration:

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5

基于该 example configuration,使用以下逻辑将数据发送到目标分区。

根据partitionKeyExpression计算发送到分区输出 channel 的每条消息的分区 key 的 value。 partitionKeyExpression是一个 SpEL 表达式,它根据出站消息进行计算,以提取分区 key。

如果 SpEL 表达式不足以满足您的需要,您可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的 implementation 并将其配置为 bean(通过使用@Bean annotation)来计算分区 key value。如果 Application Context 中有多个类型的 bean,则可以通过使用partitionKeyExtractorName property 指定 name 来进一步过滤它,如下面的 example 所示:

--spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.output.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}

在以前版本的 Spring Cloud Stream 中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass property 来指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的 implementation。自 version 2.0 以来,不推荐使用此 property,并且将在以后的 version 中删除对它的支持。

计算消息 key 后,分区选择 process 将目标分区确定为0partitionCount - 1之间的 value。适用于大多数情况的默认计算基于以下公式:key.hashCode() % partitionCount。这可以在 binding 上自定义,方法是将 SpEL 表达式设置为针对'key'(通过partitionSelectorExpression property)进行评估,或者将org.springframework.cloud.stream.binder.PartitionSelectorStrategy的_imple 实现为 bean(通过使用 @Bean annotation)。与PartitionKeyExtractorStrategy类似,当 Application Context 中有多个此类型的 bean 时,可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName property 进一步过滤它,如下面的 example 所示:

--spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}

在以前版本的 Spring Cloud Stream 中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionSelectorClass property 来指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy的 implementation。自 version 2.0 以来,不推荐使用此 property,并且将在以后的 version 中删除对它的支持。

32.3.2 配置分区的输入绑定

输入 binding(带 channel name input)配置为通过设置partitioned property 以及 application 本身的instanceIndexinstanceCount properties 来接收分区数据,如下面的示例所示:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount value 表示应在其间分区数据的 application 实例的总数。 instanceIndex必须是多个实例中的唯一 value,_val介于0instanceCount - 1之间。实例索引帮助每个 application 实例标识从中接收数据的唯一 partition(s)。 binders 需要使用不支持本机分区的技术。例如,对于 RabbitMQ,每个分区都有一个队列,队列 name 包含实例索引。使用 Kafka,如果autoRebalanceEnabledtrue(默认值), Kafka 负责跨实例分发分区,并且不需要这些 properties。如果autoRebalanceEnabled设置为 false,binder 将使用instanceCountinstanceIndex来确定实例订阅的 partition(s(您必须至少具有与实例一样多的分区)。 binder 分配分区而不是 Kafka。如果您希望特定分区的消息始终转到同一实例,这可能很有用。当 binder configuration 需要它们时,重要的是在 order 中正确设置两个值以确保消耗所有数据并且 application 实例接收互斥数据集。

虽然在独立的情况下使用多个实例进行分区数据处理的情况可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值并让您依赖运行时基础结构来显着简化 process。提供有关实例索引和实例计数的信息。