31. Configuration Options

Spring Cloud Stream 支持常规配置选项以及绑定和 Binder 的配置。一些绑定器使其他绑定属性支持特定于中间件的功能。

可以通过 Spring Boot 支持的任何机制将配置选项提供给 Spring Cloud Stream 应用程序。这包括应用程序参数,环境变量以及 YAML 或.properties 文件。

31.1 绑定服务属性

这些属性通过org.springframework.cloud.stream.config.BindingServiceProperties公开

  • spring.cloud.stream.instanceCount

    • 应用程序已部署实例的数量。必须在生产者端进行分区设置。如果使用autoRebalanceEnabled=false,则必须在使用 RabbitMQ 和 Kafka 时在用户端设置。

默认值:1

  • spring.cloud.stream.instanceIndex

    • 应用程序的实例索引:0instanceCount - 1之间的数字。用于通过 RabbitMQ 和autoRebalanceEnabled=false与 Kafka 进行分区。在 Cloud Foundry 中自动设置以匹配应用程序的实例索引。
  • spring.cloud.stream.dynamicDestinations

    • 可以动态绑定的目的地列表(例如,在动态路由方案中)。如果设置,则只能绑定列出的目的地。

默认值:空(将任何目的地绑定)。

Default: empty.

  • spring.cloud.stream.overrideCloudConnectors

    • 仅当cloud配置文件处于Active状态并且该应用程序提供了 Spring Cloud Connector 时,此属性才适用。如果属性为false(默认值),则绑定器检测到合适的绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器绑定的 RabbitMQ 服务)并将其用于创建连接(通常通过 Spring Cloud Connectors)。当设置为true时,此属性指示绑定程序完全忽略绑定的服务,并依赖于 Spring Boot 属性(例如,依赖于环境中为 RabbitMQ 绑定程序提供的spring.rabbitmq.*属性)。此属性的典型用法是嵌套在自定义环境连接到多个系统时中。

默认值:false

  • spring.cloud.stream.bindingRetryInterval

    • 例如,当绑定程序不支持后期绑定并且代理(例如,Apache Kafka)关闭时,重试绑定创建之间的间隔(以秒为单位)。将其设置为零可将此类情况视为致命情况,从而阻止应用程序启动。

默认值:30

31.2 绑定属性

绑定属性通过使用spring.cloud.stream.bindings.<channelName>.<property>=<value>的格式提供。 <channelName>代表正在配置的通道的名称(例如,output表示Source)。

为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.default.<property>=<value>的格式为所有通道设置值。

在避免重复使用扩展绑定属性时,应使用此格式-spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

在下面的内容中,我们指出了省略了spring.cloud.stream.bindings.<channelName>.前缀的位置,仅着重于属性名称,同时要了解在运行时会包含该前缀。

31.2.1 通用绑定属性

这些属性通过org.springframework.cloud.stream.config.BindingProperties公开

以下绑定属性可用于 Importing 和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.作为前缀(例如spring.cloud.stream.bindings.input.destination=ticktock)。

可以使用spring.cloud.stream.default前缀设置默认值(例如 spring.cloud.stream.default.contentType = application/json)。

  • destination

    • 绑定的中间件上的通道的目标位置(例如,RabbitMQ 交换或 Kafka 主题)。如果将通道绑定为使用者,则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔的String值。如果未设置,则使用通道名称。此属性的默认值不能被覆盖。
  • group

    • Channel 的消费群体。仅适用于入站绑定。参见Consumer Groups

默认值:null(表示匿名使用者)。

默认值:application/json

默认值:null(使用默认的 Binder,如果存在的话)。

31.2.2Consumer 属性

这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties公开

以下绑定属性仅可用于 Importing 绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.作为前缀(例如spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以使用spring.cloud.stream.default.consumer前缀(例如spring.cloud.stream.default.consumer.headerMode=none)设置默认值。

  • concurrency

    • 入站使用者的并发。

默认值:1

  • partitioned

    • Consumer 是否从分区生产者那里接收数据。

默认值:false

  • headerMode

    • 设置为none时,将禁用 Importing 的 Headers 解析。仅对不支持本地消息头并且需要消息头嵌入的消息中间件有效。当不支持本机头时,使用非 Spring Cloud Stream 应用程序中的数据时,此选项很有用。设置为headers时,它将使用中间件的本机头机制。设置为embeddedHeaders时,它将 Headers 嵌入到消息有效负载中。

默认值:取决于 Binder 的实现。

  • maxAttempts

    • 如果处理失败,则尝试处理消息的次数(包括第一个)。设置为1可禁用重试。

默认值:3

  • backOffInitialInterval

    • 重试时的退避初始间隔。

默认值:1000

  • backOffMaxInterval

    • 最大退避间隔。

默认值:10000

  • backOffMultiplier

    • 退避乘数。

默认值:2.0

  • defaultRetryable

    • retryableExceptions中未列出的侦听器引发的异常是否可以重试。

默认值:true

  • instanceIndex

    • 设置为大于零的值时,它允许自定义此使用者的实例索引(如果与spring.cloud.stream.instanceIndex不同)。设置为负值时,默认为spring.cloud.stream.instanceIndex。有关更多信息,请参见“ 第 34.2 节“实例索引和实例计数””。

默认值:-1

  • instanceCount

    • 设置为大于零的值时,它允许自定义此使用者的实例计数(如果与spring.cloud.stream.instanceCount不同)。设置为负值时,默认为spring.cloud.stream.instanceCount。有关更多信息,请参见“ 第 34.2 节“实例索引和实例计数””。

默认值:-1

  • retryableExceptions

    • 键中 Throwable 类名称的 Map,值中布尔值的 Map。指定将要重试的那些异常(和子类)。另请参见defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

Default: empty.

  • useNativeDecoding

    • 当设置为true时,Client 端库将直接对入站消息进行反序列化,该库必须进行相应配置(例如,设置适当的 Kafka 生产者值反序列化器)。使用此配置时,入站邮件解组不是基于绑定的contentType。使用本机解码时,生产者负责使用适当的编码器(例如,Kafka 生产者值序列化程序)对出站消息进行序列化。另外,使用本机编码和解码时,headerMode=embeddedHeaders属性将被忽略,并且 Headers 不会嵌入消息中。参见生产者属性useNativeEncoding

默认值:false

31.2.3 生产者属性

这些属性通过org.springframework.cloud.stream.binder.ProducerProperties公开

以下绑定属性仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.作为前缀(例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id)。

可以使用前缀spring.cloud.stream.default.producer设置默认值(例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id)。

  • partitionKeyExpression

    • 一个 SpEL 表达式,该表达式确定如何对出站数据进行分区。如果已设置,或者设置了partitionKeyExtractorClass,则会对该通道上的出站数据进行分区。 partitionCount必须设置为大于 1 的值才能生效。与partitionKeyExtractorClass互斥。请参阅“ 第 28.6 节“分区支持””。

Default: null.

  • partitionKeyExtractorClass

    • PartitionKeyExtractorStrategy实现。如果已设置,或者已设置partitionKeyExpression,则会对该通道上的出站数据进行分区。 partitionCount必须设置为大于 1 的值才能生效。与partitionKeyExpression互斥。请参阅“ 第 28.6 节“分区支持””。

默认值:null

  • partitionSelectorClass

    • PartitionSelectorStrategy实现。与partitionSelectorExpression互斥。如果两者均未设置,则将分区选择为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null

  • partitionSelectorExpression

    • 用于自定义分区选择的 SpEL 表达式。与partitionSelectorClass互斥。如果两者均未设置,则将分区选择为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null

  • partitionCount

    • 数据的目标分区数(如果启用了分区)。如果生产者已分区,则必须将其设置为大于 1 的值。在 Kafka 上,它被解释为提示。取其较大者,并使用目标主题的分区数。

默认值:1

  • requiredGroups

    • 生产者必须确保将消息传递到的组的逗号分隔列表,即使它们是在创建消息之后开始的(例如,通过在 RabbitMQ 中预先创建持久队列)。
  • headerMode

    • 设置为none时,它将禁用在输出中嵌入 Headers。它仅对本身不支持消息头并且需要消息头嵌入的消息中间件有效。当不支持本机头时,在为非 Spring Cloud Stream 应用程序生成数据时,此选项很有用。设置为headers时,它将使用中间件的本机头机制。设置为embeddedHeaders时,它将 Headers 嵌入到消息有效负载中。

默认值:取决于 Binder 的实现。

  • useNativeEncoding

    • 当设置为true时,出站消息将直接由 Client 端库进行序列化,Client 端库必须进行相应配置(例如,设置适当的 Kafka 生产者值序列化程序)。使用此配置时,出站邮件编组不是基于绑定的contentType。使用本机编码时,Consumer 有责任使用适当的解码器(例如,KafkaConsumer 值反序列化器)对入站消息进行反序列化。另外,使用本机编码和解码时,headerMode=embeddedHeaders属性将被忽略,并且 Headers 不会嵌入消息中。参见 Consumer 属性useNativeDecoding

默认值:false

  • errorChannelEnabled

    • 设置为true时,如果 Binder 支持异步发送结果,则发送失败将发送到目标的错误通道。有关更多信息,请参见“ ???”。

默认值:false

31.3 使用动态绑定的目的地

除了使用@EnableBinding定义的通道外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目的地。例如,当需要在运行时确定目标目的地时,这很有用。应用程序可以使用由@EnableBinding注解自动注册的BinderAwareChannelResolver bean 来实现。

“ spring.cloud.stream.dynamicDestinations”属性可用于将动态目标名称限制为已知集合(白名单)。如果未设置此属性,则可以动态绑定任何目标。

BinderAwareChannelResolver可以直接使用,如以下使用路径变量来确定目标通道的 REST 控制器示例所示:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

现在考虑当我们在默认端口(8080)上启动应用程序并使用 CURL 发出以下请求时会发生什么:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

在代理中创建目的地(“Client”和“订单”)(在 Rabbit 的交换中或在 Kafka 的主题中),名称为“Client”和“订单”,并将数据发布到适当的目的地。

BinderAwareChannelResolver是通用的 Spring Integration DestinationResolver,并且可以注入到其他组件中,例如,在 Router 中使用 SpEL 表达式,该表达式基于传入 JSON 消息的target字段。以下示例包含一个读取 SpEL 表达式的 Router:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/", method = POST, consumes = "application/json")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, contentType);
    }

    private void sendMessage(Object body, Object contentType) {
        routerChannel().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

    @Bean(name = "routerChannel")
    public MessageChannel routerChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "routerChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }
}

Router 接收器应用使用此技术按需创建目的地。

如果预先知道通道名称,则可以像其他任何目的地一样配置生产者属性。或者,如果您注册NewBindingCallback<> bean,则在创建绑定之前调用它。回调采用绑定程序使用的扩展生产者属性的通用类型。它有一种方法:

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例显示了如何使用 RabbitMQ 绑定器:

@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

Note

如果需要支持具有多种 Binder 类型的动态目标,请对通用类型使用Object并根据需要强制转换extended参数。