32. Inter-Application 沟通
Spring Cloud Stream 支持 applications 之间的通信。 Inter-application 通信是一个涉及多个问题的复杂问题,如以下主题所述:
32.1 连接多个 Application 实例
虽然 Spring Cloud Stream 使单个 Spring Boot 应用程序可以轻松连接到消息传递系统,但 Spring Cloud Stream 的典型方案是创建 multi-application 管道,其中微服务应用程序相互发送数据。您可以通过关联“相邻”应用程序的输入和输出目标来实现此方案。
假设 Time Source application 的设计 calls 将数据发送到 Log Sink application。您可以在 applications 中使用名为ticktock
的 common 目标进行绑定。
Time Source(具有 channel name output
)将设置以下 property:
spring.cloud.stream.bindings.output.destination=ticktock
Log Sink(具有 channel name input
)将设置以下 property:
spring.cloud.stream.bindings.input.destination=ticktock
32.2 实例索引和实例计数
在放大 Spring Cloud Stream applications 时,每个实例都可以接收有关同一 application 存在多少其他实例以及它自己的实例索引的信息。 Spring Cloud Stream 通过spring.cloud.stream.instanceCount
和spring.cloud.stream.instanceIndex
properties 执行此操作。例如,如果有三个 HDFS sink application 实例,则所有三个实例的spring.cloud.stream.instanceCount
都设置为3
,而各个 applications 的spring.cloud.stream.instanceIndex
分别设置为0
,1
和2
。
当 Spring Cloud Stream applications 通过 Spring Cloud 数据流部署时,这些 properties 会自动配置;当 Spring Cloud Stream applications 独立启动时,必须正确设置这些 properties。默认情况下,spring.cloud.stream.instanceCount
是1
,spring.cloud.stream.instanceIndex
是0
。
在 scaled-up 场景中,正确配置这两个 properties 对于解决分区行为(见下文)非常重要,并且 order 中的某些 binders(对于 example,Kafka binder)始终需要两个 properties 以确保数据被拆分正确地跨多个 consumer 实例。
32.3 分区
Spring Cloud Stream 中的分区包含两个任务:
32.3.1 配置分区的输出绑定
您可以通过设置一个且只有一个partitionKeyExpression
或partitionKeyExtractorName
properties 以及partitionCount
property 来配置输出 binding 以发送分区数据。
例如,以下是有效且典型的 configuration:
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5
基于该 example configuration,使用以下逻辑将数据发送到目标分区。
根据partitionKeyExpression
计算发送到分区输出 channel 的每条消息的分区 key 的 value。 partitionKeyExpression
是一个 SpEL 表达式,它根据出站消息进行计算,以提取分区 key。
如果 SpEL 表达式不足以满足您的需要,您可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
的 implementation 并将其配置为 bean(通过使用@Bean
annotation)来计算分区 key value。如果 Application Context 中有多个类型的 bean,则可以通过使用partitionKeyExtractorName
property 指定 name 来进一步过滤它,如下面的 example 所示:
--spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.output.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
在以前版本的 Spring Cloud Stream 中,您可以通过设置
spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass
property 来指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy
的 implementation。自 version 2.0 以来,不推荐使用此 property,并且将在以后的 version 中删除对它的支持。
计算消息 key 后,分区选择 process 将目标分区确定为0
和partitionCount - 1
之间的 value。适用于大多数情况的默认计算基于以下公式:key.hashCode() % partitionCount
。这可以在 binding 上自定义,方法是将 SpEL 表达式设置为针对'key'(通过partitionSelectorExpression
property)进行评估,或者将org.springframework.cloud.stream.binder.PartitionSelectorStrategy
的_imple 实现为 bean(通过使用 @Bean annotation)。与PartitionKeyExtractorStrategy
类似,当 Application Context 中有多个此类型的 bean 时,可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName
property 进一步过滤它,如下面的 example 所示:
--spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
在以前版本的 Spring Cloud Stream 中,您可以通过设置
spring.cloud.stream.bindings.output.producer.partitionSelectorClass
property 来指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy
的 implementation。自 version 2.0 以来,不推荐使用此 property,并且将在以后的 version 中删除对它的支持。
32.3.2 配置分区的输入绑定
输入 binding(带 channel name input
)配置为通过设置partitioned
property 以及 application 本身的instanceIndex
和instanceCount
properties 来接收分区数据,如下面的示例所示:
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5
instanceCount
value 表示应在其间分区数据的 application 实例的总数。 instanceIndex
必须是多个实例中的唯一 value,_val介于0
和instanceCount - 1
之间。实例索引帮助每个 application 实例标识从中接收数据的唯一 partition(s)。 binders 需要使用不支持本机分区的技术。例如,对于 RabbitMQ,每个分区都有一个队列,队列 name 包含实例索引。使用 Kafka,如果autoRebalanceEnabled
是true
(默认值), Kafka 负责跨实例分发分区,并且不需要这些 properties。如果autoRebalanceEnabled
设置为 false,binder 将使用instanceCount
和instanceIndex
来确定实例订阅的 partition(s(您必须至少具有与实例一样多的分区)。 binder 分配分区而不是 Kafka。如果您希望特定分区的消息始终转到同一实例,这可能很有用。当 binder configuration 需要它们时,重要的是在 order 中正确设置两个值以确保消耗所有数据并且 application 实例接收互斥数据集。
虽然在独立的情况下使用多个实例进行分区数据处理的情况可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值并让您依赖运行时基础结构来显着简化 process。提供有关实例索引和实例计数的信息。