31. Inter-Application 沟通

31.1 连接多个 Application 实例

虽然 Spring Cloud Stream 使单个 Spring Boot 应用程序可以轻松连接到消息传递系统,但 Spring Cloud Stream 的典型方案是创建 multi-application 管道,其中微服务应用程序相互发送数据。您可以通过关联相邻 applications 的输入和输出目标来实现此方案。

假设 Time Source application 的设计 calls 将数据发送到 Log Sink application,您可以在 applications 中使用名为ticktock的 common 目标进行绑定。

Time Source(具有 channel name output)将设置以下 property:

spring.cloud.stream.bindings.output.destination=ticktock

Log Sink(具有 channel name input)将设置以下 property:

spring.cloud.stream.bindings.input.destination=ticktock

31.2 实例索引和实例计数

在放大 Spring Cloud Stream applications 时,每个实例都可以接收有关同一 application 存在多少其他实例以及它自己的实例索引的信息。 Spring Cloud Stream 通过spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex properties 执行此操作。例如,如果有三个 HDFS sink application 实例,则所有三个实例都将spring.cloud.stream.instanceCount设置为3,而各个 applications 将spring.cloud.stream.instanceIndex分别设置为012

当 Spring Cloud Stream applications 通过 Spring Cloud 数据流部署时,这些 properties 会自动配置;当 Spring Cloud Stream applications 独立启动时,必须正确设置这些 properties。默认情况下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0

在 scaled-up 场景中,正确配置这两个 properties 对于解决分区行为(见下文)非常重要,并且 order 中的某些 binders(e.g. , Kafka binder)始终需要两个 properties 以确保数据被拆分正确地跨多个 consumer 实例。

31.3 分区

31.3.1 配置分区的输出绑定

输出 binding 配置为通过设置一个且仅有一个partitionKeyExpressionpartitionKeyExtractorClass properties 以及partitionCount property 来发送分区数据。例如,以下是有效且典型的 configuration:

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.output.producer.partitionCount=5

基于上述 example configuration,数据将使用以下逻辑发送到目标分区。

根据partitionKeyExpression计算发送到分区输出 channel 的每条消息的分区 key 的 value。 partitionKeyExpression是一个 SpEL 表达式,它根据出站消息进行计算,以提取分区 key。

如果 SpEL 表达式不足以满足您的需要,您可以通过将 property partitionKeyExtractorClass设置为实现org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy接口的 class 来计算分区 key value。虽然 SpEL 表达式通常应该足够,但更复杂的情况可能会使用自定义 implementation 策略。在这种情况下,property'partitionKeyExtractorClass'可以设置如下:

spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass=com.example.MyKeyExtractor
spring.cloud.stream.bindings.output.producer.partitionCount=5

计算消息 key 后,分区选择 process 将确定目标分区为0partitionCount - 1之间的 value。适用于大多数情况的默认计算基于公式key.hashCode() % partitionCount。这可以在 binding 上自定义,方法是设置要根据'key'(通过partitionSelectorExpression property)计算的 SpEL 表达式,或者设置org.springframework.cloud.stream.binder.PartitionSelectorStrategy implementation(通过partitionSelectorClass property)。

可以指定'partitionSelectorExpression'和'partitionSelectorClass'的 binding level properties,类似于上面示例中指定'partitionKeyExpression'和'partitionKeyExtractorClass'properties 的方式。可以为更高级的方案配置其他 properties,如以下部分所述。

Spring-managed 自定义 PartitionKeyExtractorClass implementations

在上面的 example 中,@在某些情况下,有必要将这样的自定义策略 implementation 创建为 Spring bean,以便能够由 Spring 管理,以便它可以执行依赖注入,属性 binding 等。这可以通过配置它来完成作为 application context 中的 @Bean 并使用完全限定的 class name 作为 bean 的 name,如下面的 example 中所示。

@Bean(name="com.example.MyKeyExtractor")
public MyKeyExtractor extractor() {
    return new MyKeyExtractor();
}

作为 Spring bean,自定义策略受益于 Spring bean 的整个生命周期。例如,如果 implementation 需要直接访问 application context,它可以使实现'ApplicationContextAware'。

配置分区的输入绑定

输入 binding(带 channel name input)配置为通过设置partitioned property 以及 application 本身的instanceIndexinstanceCount properties 来接收分区数据,如下面的 example:

spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount value 表示需要对数据进行分区的 application 实例的总数,instanceIndex必须是多个实例之间0instanceCount - 1之间的唯一 value。实例索引帮助每个 application 实例标识从中接收数据的唯一分区(或者,在 Kafka 的情况下,分区集)。在 order 中正确设置两个值非常重要,以确保消耗所有数据并且 application 实例接收互斥数据集。

虽然在独立的情况下使用多个实例进行分区数据处理的方案可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值以及依赖运行时基础结构来提供信息来显着简化 process。关于实例索引和实例计数。