29. Configuration 选项

Spring Cloud Stream 支持常规 configuration 选项以及绑定和 binders 的 configuration。一些 binders 允许额外的 binding properties 支持 middleware-specific features。

Configuration 选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream applications。这包括 application arguments,环境变量和 YAML 或.properties files。

29.1 Binding Service Properties

这些 properties 通过org.springframework.cloud.stream.config.BindingServiceProperties公开

  • spring.cloud.stream.instanceCount

    • application 的已部署实例数。必须设置为在 producer 端进行分区。使用 RabbitMQ 时必须在 consumer 端设置,如果autoRebalanceEnabled=false则必须设置 Kafka。

默认值:1

  • spring.cloud.stream.instanceIndex

    • application 的实例索引:从0instanceCount - 1的数字。用于使用 RabbitMQ 进行分区,使用 Kafka 进行autoRebalanceEnabled=false分区。自动在 Cloud Foundry 中设置匹配 application 的实例索引。
  • spring.cloud.stream.dynamicDestinations

    • 可动态绑定的目标列表(例如,在动态路由方案中)。如果设置,则只能绑定列出的目标。

默认值:空(允许绑定任何目标)。

默认值:空。

  • spring.cloud.stream.overrideCloudConnectors

    • 此 property 仅适用于cloud profile 为 active 且 Spring Cloud 连接器随 application 一起提供的情况。如果 property 是false(默认值),则 binder 会检测到合适的绑定服务(对于 example,为 RabbitMQ binder 绑定在 Cloud Foundry 中的 RabbitMQ 服务)并将其用于 creating 连接(通常通过 Spring Cloud 连接器)。当设置为true时,此 property 指示 binders 完全忽略绑定服务并依赖 Spring Boot properties(对于 example,依赖于 RabbitMQ binder 环境中提供的spring.rabbitmq.* properties)。此 property 的典型用法是嵌套在自定义环境当连接到多个系统时中。

默认值:false

  • spring.cloud.stream.bindingRetryInterval

    • 重写 binding 创建之间的间隔(以秒为单位),例如,binder 不支持后期 binding 和 broker(对于 example,Apache Kafka)已关闭。将其设置为零以将此类条件视为致命条件,从而阻止 application 启动。

默认值:30

29.2 Binding Properties

Binding properties 使用spring.cloud.stream.bindings.<channelName>.<property>=<value>的格式提供。 <channelName>表示正在配置的 channel 的 name(对于 example,output表示Source)。

为避免重复,Spring Cloud Stream 支持所有 channel 的设置值,格式为spring.cloud.stream.default.<property>=<value>

在下文中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.前缀并仅关注 property name,并理解前缀 ise 包含在运行时。

29.2.1 Common Binding Properties

这些 properties 通过org.springframework.cloud.stream.config.BindingProperties公开

以下 binding properties 可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.为前缀(对于 example,spring.cloud.stream.bindings.input.destination=ticktock)。

可以使用spring.cloud.stream.default前缀(对于 example spring.cloud.stream.default.contentType=application/json)设置默认值。

  • 目的地

    • 绑定中间件上 channel 的目标目标(对于 example,RabbitMQ 交换或 Kafka topic)。如果 channel 绑定为 consumer,则它可以绑定到多个目标,并且目标名称可以指定为 comma-separated String值。如果未设置,则使用 channel name。无法覆盖此 property 的默认 value。
    • channel 的 consumer group。仅适用于入站绑定。见Consumer Groups

默认值:null(表示匿名 consumer)。

默认值:null(不执行类型强制)。

默认值:null(使用默认的 binder,如果存在)。

29.2.2 Consumer Properties

这些 properties 通过org.springframework.cloud.stream.binder.ConsumerProperties公开

以下 binding properties 仅可用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.为前缀(对于 example,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以使用spring.cloud.stream.default.consumer前缀设置默认值(对于 example,spring.cloud.stream.default.consumer.headerMode=none)。

  • 并发

    • 入站 consumer 的并发性。

默认值:1

  • 分区

    • consumer 是否从分区的 producer 接收数据。

默认值:false

  • headerMode

    • 设置为none时,禁用输入上的标头解析。仅适用于本机不支持消息 headers 且需要标头嵌入的消息传递中间件。当不支持本机 headers 时__suming 来自 non-Spring Cloud Stream applications 的数据时,此选项很有用。设置为headers时,它使用中间件的本机头机制。设置为embeddedHeaders时,它会将 headers 嵌入到消息有效内容中。

默认值:取决于 binder implementation。

  • maxAttempts

    • 如果处理失败,则尝试处理消息(包括第一个)。设置为1以禁用重试。

默认值:3

  • backOffInitialInterval

    • 重试时的退避初始间隔。

默认值:1000

  • backOffMaxInterval

    • 最大退避间隔。

默认值:10000

  • backOffMultiplier

    • 退避乘数。

默认值:2.0

  • instanceIndex

    • 当设置为大于等于零的 value 时,它允许自定义此 consumer 的实例索引(如果与spring.cloud.stream.instanceIndex不同)。设置为负 value 时,默认为spring.cloud.stream.instanceIndex。有关详细信息,请参阅“第 32.2 节,“实例索引和实例计数””。

默认值:-1

  • instanceCount

    • 当设置为大于等于零的 value 时,它允许自定义此 consumer 的实例计数(如果与spring.cloud.stream.instanceCount不同)。设置为负 value 时,默认为spring.cloud.stream.instanceCount。有关详细信息,请参阅“第 32.2 节,“实例索引和实例计数””。

默认值:-1

  • useNativeDecoding

    • 设置为true时,client library 直接反序列化入站消息,必须相应地对其进行配置(对于 example,设置适当的 Kafka producer value 反序列化器)。使用此 configuration 时,入站消息解组不基于 binding 的contentType。使用本机解码时,producer 负责使用适当的编码器(例如,Kafka producer value 序列化程序)来序列化出站消息。此外,使用本机编码和解码时,将忽略headerMode=embeddedHeaders property,并且不会在消息中嵌入 headers。请参阅 producer property useNativeEncoding

默认值:false

29.2.3 Producer Properties

这些 properties 通过org.springframework.cloud.stream.binder.ProducerProperties公开

以下 binding properties 仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.为前缀(对于 example,spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id)。

可以使用前缀spring.cloud.stream.default.producer(对于 example,spring.cloud.stream.default.producer.partitionKeyExpression=payload.id)设置默认值。

  • partitionKeyExpression

    • 一个 SpEL 表达式,用于确定如何对出站数据进行分区。如果设置,或者设置了partitionKeyExtractorClass,则此 channel 上的出站数据将被分区。必须将partitionCount设置为大于 1 的 value 才能生效。与partitionKeyExtractorClass互斥。见“第 26.6 节,“分区支持””。

默认值:null。

  • partitionKeyExtractorClass

    • 一个PartitionKeyExtractorStrategy 实现。如果设置,或者设置了partitionKeyExpression,则此 channel 上的出站数据将被分区。必须将partitionCount设置为大于 1 的 value 才能生效。与partitionKeyExpression互斥。见“第 26.6 节,“分区支持””。

默认值:null

  • partitionSelectorClass

    • 一个PartitionSelectorStrategy 实现。与partitionSelectorExpression互斥。如果两者都未设置,则选择分区为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null

  • partitionSelectorExpression

    • 用于自定义分区选择的 SpEL 表达式。与partitionSelectorClass互斥。如果两者都未设置,则选择分区为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null

  • partitionCount

    • 如果启用了分区,则数据的目标分区数。如果 producer 被分区,则必须设置为大于 1 的 value。在 Kafka 上,它被解释为提示。使用较大的此值和目标 topic 的分区计数。

默认值:1

  • requiredGroups

    • 一个 comma-separated 组的列表,producer 必须确保消息传递,即使它们在创建后也是如此(例如,通过 RabbitMQ 中的 pre-creating 持久队列)。
  • headerMode

    • 设置为none时,它会禁用输出中的标头嵌入。它仅对于本地不支持消息 headers 且需要头嵌入的消息传递中间件有效。当不支持本机_header 时 production non-Spring Cloud Stream applications 的数据时,此选项很有用。设置为headers时,它使用中间件的本机头机制。设置为embeddedHeaders时,它会将 headers 嵌入到消息有效内容中。

默认值:取决于 binder implementation。

  • useNativeEncoding

    • 设置为true时,出站消息由 client library 直接序列化,必须相应地配置(例如,设置适当的 Kafka producer value 序列化程序)。使用此 configuration 时,出站邮件编组不基于 binding 的contentType。当使用本机编码时,consumer 负责使用适当的解码器(例如,Kafka consumer value de-serializer)来反序列化入站消息。此外,使用本机编码和解码时,将忽略headerMode=embeddedHeaders property,并且不会在消息中嵌入 headers。请参阅 consumer property useNativeDecoding

默认值:false

  • errorChannelEnabled

    • 设置为true时,如果 binder 支持异步发送结果,则发送失败将发送到目标的错误 channel。有关详细信息,请参阅“???”。

默认值:false

29.3 使用动态绑定目标

除了使用@EnableBinding定义的 channel 之外,Spring Cloud Stream 允许 applications 将消息发送到动态绑定的目标。例如,当需要在运行时确定目标目标时,这很有用。 Applications 可以使用由@EnableBinding annotation 自动注册的BinderAwareChannelResolver bean 来实现。

'spring.cloud.stream.dynamicDestinations'property 可用于将动态目标名称限制为已知集(白名单)。如果未设置此 property,则可以动态绑定任何目标。

BinderAwareChannelResolver可以直接使用,如下面的 REST 控制器的示例所示,使用路径变量来决定目标 channel:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

现在考虑当我们在默认 port(8080)上启动 application 并使用 CURL 发出以下请求时会发生什么:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地,“客户”和“订单”在 broker(在 Rabbit 的交换中或在 Kafka 的 topic 中)创建,名称为“customers”和“orders”,数据将发布到相应的目的地。

BinderAwareChannelResolver是一个 general-purpose Spring Integration DestinationResolver,可以在其他组件中注入 - 例如,在 router 中使用基于传入 JSON 消息的target字段的 SpEL 表达式。以下 example 包含一个读取 SpEL 表达式的 router:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/", method = POST, consumes = "application/json")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, contentType);
    }

    private void sendMessage(Object body, Object contentType) {
        routerChannel().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

    @Bean(name = "routerChannel")
    public MessageChannel routerChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "routerChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }
}

Router Sink Application使用此技术创建目标 on-demand。

如果事先知道 channel 名称,则可以将 producer properties 配置为与任何其他目标一样。或者,如果注册NewBindingCallback<> bean,则会在创建 binding 之前调用它。回调采用 binder 使用的扩展 producer properties 的泛型类型。它有一种方法:

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下 example 显示了如何使用 RabbitMQ binder:

@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

如果需要支持具有多个 binder 类型的动态目标,请使用Object作为泛型类型,并根据需要强制转换extended参数。