34. Inter-Application Communication

Spring Cloud Stream 支持应用程序之间的通信。应用程序间通信是一个涉及多个问题的复杂问题,如以下主题所述:

34.1 连接多个应用程序实例

尽管 Spring Cloud Stream 使单个 Spring Boot 应用程序易于连接到消息传递系统,但是 Spring Cloud Stream 的典型方案是创建多应用程序管道,在该管道中,微服务应用程序会相互发送数据。您可以通过关联“相邻”应用程序的 Importing 和输出目标来实现此方案。

假设设计要求 Time Source 应用程序将数据发送到 Log Sink 应用程序。您可以将名为ticktock的公共目标用于两个应用程序中的绑定。

时间源(通道名称为output)将设置以下属性:

spring.cloud.stream.bindings.output.destination=ticktock

日志接收器(通道名称为input)将设置以下属性:

spring.cloud.stream.bindings.input.destination=ticktock

34.2 实例索引和实例计数

在扩展 Spring Cloud Stream 应用程序时,每个实例都可以接收有关同一应用程序还存在多少其他实例以及它自己的实例索引是什么的信息。 Spring Cloud Stream 通过spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex属性进行此操作。例如,如果存在 HDFS 接收器应用程序的三个实例,则所有三个实例的spring.cloud.stream.instanceCount设置为3,而各个应用程序的spring.cloud.stream.instanceIndex设置分别为012

当通过 Spring Cloud Data Flow 部署 Spring Cloud Stream 应用程序时,这些属性会自动配置。当独立启动 Spring Cloud Stream 应用程序时,必须正确设置这些属性。默认情况下,spring.cloud.stream.instanceCount1,而spring.cloud.stream.instanceIndex0

在按比例放大的方案中,正确地配置这两个属性通常对于解决分区行为很重要(请参见下文),并且某些绑定程序(例如,Kafka 绑定程序)始终需要这两个属性,以确保数据在多个使用者实例之间正确划分。

34.3 Partitioning

在 Spring Cloud Stream 中进行分区包括两个任务:

34.3.1 配置用于分区的输出绑定

您可以通过设置其partitionKeyExpressionpartitionKeyExtractorName属性及其partitionCount属性中的一个或仅一个,来配置输出绑定以发送分区数据。

例如,以下是有效的典型配置:

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5

基于该示例配置,通过使用以下逻辑将数据发送到目标分区。

基于partitionKeyExpression为发送到分区输出通道的每个消息计算分区键的值。 partitionKeyExpression是一个 SpEL 表达式,该表达式针对出站消息进行评估以提取分区键。

如果 SpEL 表达式不足以满足您的需要,则可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的实现并将其配置为 Bean(使用@Bean注解)来计算分区键值。如果在应用程序上下文中有一个以上的org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy类型的 bean,则可以通过使用partitionKeyExtractorName属性指定其名称来进一步过滤它,如以下示例所示:

--spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.output.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}

Note

在早期版本的 Spring Cloud Stream 中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass属性来指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的实现。从 2.0 版开始,不推荐使用此属性,并且在将来的版本中将不再支持该属性。

一旦计算出消息密钥,分区选择过程就会将目标分区确定为0partitionCount - 1之间的值。适用于大多数情况的默认计算基于以下公式:key.hashCode() % partitionCount。可以在绑定上进行自定义,方法是设置 SpEL 表达式以针对“键”进行评估(通过partitionSelectorExpression属性),也可以将org.springframework.cloud.stream.binder.PartitionSelectorStrategy的实现配置为 Bean(通过使用@Bean 注解)。与PartitionKeyExtractorStrategy相似,当应用程序上下文中有多个这种类型的 bean 可用时,可以使用spring.cloud.stream.bindings.output.producer.partitionSelectorName属性进一步过滤它,如以下示例所示:

--spring.cloud.stream.bindings.output.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}

Note

在早期版本的 Spring Cloud Stream 中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionSelectorClass属性来指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy的实现。从 2.0 版开始,不推荐使用此属性,并且在将来的版本中将删除对此属性的支持。

34.3.2 配置用于分区的 Importing 绑定

Importing 绑定(通道名称为input)被配置为通过设置其partitioned属性以及应用程序本身的instanceIndexinstanceCount属性来接收分区数据,如以下示例所示:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount值表示应在其之间分区数据的应用程序实例总数。 instanceIndex在多个实例中必须是唯一值,且值介于0instanceCount - 1之间。实例索引可帮助每个应用程序实例标识其从中接收数据的唯一分区。Binder 要求使用不支持本地分区的技术。例如,对于 RabbitMQ,每个分区都有一个队列,该队列名称包含实例索引。对于 Kafka,如果autoRebalanceEnabledtrue(默认值),则 Kafka 负责在实例之间分配分区,并且这些属性不是必需的。如果autoRebalanceEnabled设置为 false,则绑定器将使用instanceCountinstanceIndex来确定实例所预订的分区(您必须拥有与实例数量一样多的分区)。Binder 分配分区而不是 Kafka。如果您希望特定分区的消息始终发送到同一实例,这可能很有用。当 Binder 配置需要它们时,重要的是正确设置两个值,以确保使用所有数据,并且应用程序实例接收互斥的数据集。

尽管在单独情况下使用多个实例进行分区数据处理可能会很复杂,但 Spring Cloud Dataflow 可以通过正确填充 Importing 和输出值以及让您依靠运行时基础架构来显着简化流程。提供有关实例索引和实例计数的信息。