28. Configuration 选项

Spring Cloud Stream 支持常规 configuration 选项以及绑定和 binders 的 configuration。一些 binders 允许额外的 binding properties 来支持 middleware-specific features。

Configuration 选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream applications。这包括 application arguments,环境变量和 YAML 或.properties files。

28.1 Spring Cloud Stream Properties

  • spring.cloud.stream.instanceCount

    • application 的已部署实例数。必须设置为分区,如果使用 Kafka。

默认值:1

  • spring.cloud.stream.instanceIndex

    • application 的实例索引:从0instanceCount -1 的数字。用于分区和 Kafka。自动在 Cloud Foundry 中设置匹配 application 的实例索引。
  • spring.cloud.stream.dynamicDestinations

    • 可动态绑定的目标列表(例如,在动态路由方案中)。如果设置,则只能绑定列出的目标。

默认值:空(允许绑定任何目标)。

默认值:空。

  • spring.cloud.stream.overrideCloudConnectors

    • 此 property 仅适用于cloud profile 为 active 且 Spring Cloud 连接器随 application 一起提供的情况。如果 property 是 false(默认值),binder 将检测一个合适的绑定服务(e.g. 为 RabbitMQ binder 绑定在 Cloud Foundry 中的 RabbitMQ 服务)并将其用于 creating 连接(通常通过 Spring Cloud 连接器)。当设置为 true 时,此 property 指示 binders 完全忽略绑定服务并依赖 Spring Boot properties(e.g. 依赖于 RabbitMQ binder 环境中提供的spring.rabbitmq.* properties)。此 property 的典型用法是嵌套在自定义环境当连接到多个系统时中。

默认值:false。

28.2 Binding Properties

Binding properties 使用spring.cloud.stream.bindings.<channelName>.<property>=<value>格式提供。 <channelName>表示正在配置的 channel 的 name(e.g. ,output表示Source)。

为避免重复,Spring Cloud Stream 支持所有 channel 的设置值,格式为spring.cloud.stream.default.<property>=<value>

在下文中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.前缀并仅关注 property name,并理解前缀将包含在运行时。

28.2.1 使用 Spring Cloud Stream 的属性

以下 binding properties 可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.,e.g 为前缀。 spring.cloud.stream.bindings.input.destination=ticktock

可以使用前缀spring.cloud.stream.default,e.g 设置默认值。 spring.cloud.stream.default.contentType=application/json

  • 目的地

    • 绑定中间件(e.g. ,RabbitMQ 交换或 Kafka topic)上 channel 的目标目标。如果 channel 绑定为 consumer,则它可以绑定到多个目标,并且可以将目标名称指定为逗号分隔的 String 值。如果未设置,则使用 channel name。无法覆盖此 property 的默认 value。
    • channel 的 consumer group。仅适用于入站绑定。见Consumer Groups

默认值:null(表示匿名 consumer)。

  • 内容类型

    • channel 的 content type。

默认值:null(这样就不会执行类型强制)。

默认值:null(将使用默认的 binder,如果存在的话)。

28.2.2 Consumer properties

以下 binding properties 仅可用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.,e.g 为前缀。 spring.cloud.stream.bindings.input.consumer.concurrency=3

可以使用前缀spring.cloud.stream.default.consumer,e.g 设置默认值。 spring.cloud.stream.default.consumer.headerMode=raw

  • 并发

    • 入站 consumer 的并发性。

默认值:1

  • 分区

    • consumer 是否从分区的 producer 接收数据。

默认值:false

  • headerMode

    • 设置为raw时,禁用输入上的标头解析。仅适用于本机不支持消息 headers 且需要标头嵌入的消息传递中间件。入站数据来自外部 Spring Cloud Stream applications 时很有用。

默认值:embeddedHeaders

  • maxAttempts

    • 如果处理失败,则尝试处理消息(包括第一个)。设置为 1 以禁用重试。

默认值:3

  • backOffInitialInterval

    • 重试时的退避初始间隔。

默认值:1000

  • backOffMaxInterval

    • 最大退避间隔。

默认值:10000

  • backOffMultiplier

    • 退避乘数。

默认值:2.0

  • instanceIndex

    • 当设置为大于等于零的 value 时,允许自定义此 consumer 的实例索引(如果与spring.cloud.stream.instanceIndex不同)。设置为负 value 时,默认为spring.cloud.stream.instanceIndex

默认值:-1

  • instanceCount

    • 当设置为大于等于零的 value 时,允许自定义此 consumer 的实例计数(如果与spring.cloud.stream.instanceCount不同)。设置为负 value 时,默认为spring.cloud.stream.instanceCount

默认值:-1

28.2.3 Producer Properties

以下 binding properties 仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.,e.g 为前缀。 spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id

可以使用前缀spring.cloud.stream.default.producer,e.g 设置默认值。 spring.cloud.stream.default.producer.partitionKeyExpression=payload.id

  • partitionKeyExpression

    • 一个 SpEL 表达式,用于确定如何对出站数据进行分区。如果设置,或者设置了partitionKeyExtractorClass,则此 channel 上的出站数据将被分区,并且partitionCount必须设置为大于 1 的 value 才能生效。这两个选项是互斥的。见第 25.5 节,“分区支持”

默认值:null。

  • partitionKeyExtractorClass

    • 一个PartitionKeyExtractorStrategy 实现。如果设置,或者设置了partitionKeyExpression,则此 channel 上的出站数据将被分区,并且partitionCount必须设置为大于 1 的 value 才能生效。这两个选项是互斥的。见第 25.5 节,“分区支持”

默认值:null。

  • partitionSelectorClass

    • 一个PartitionSelectorStrategy 实现。与partitionSelectorExpression互斥。如果两者都未设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null。

  • partitionSelectorExpression

    • 用于自定义分区选择的 SpEL 表达式。与partitionSelectorClass互斥。如果两者都未设置,则分区将被选为hashCode(key) % partitionCount,其中key通过partitionKeyExpressionpartitionKeyExtractorClass计算。

默认值:null。

  • partitionCount

    • 如果启用了分区,则数据的目标分区数。如果 producer 被分区,则必须设置为大于 1 的 value。在 Kafka 上,被解释为暗示;更大的这个和目标 topic 的分区数用来代替。

默认值:1

  • requiredGroups

  • headerMode

    • 设置为raw时,禁用输出上的标头嵌入。仅适用于本机不支持消息 headers 且需要标头嵌入的消息传递中间件。 生成 non-Spring Cloud Stream applications 的数据时很有用。

默认值:embeddedHeaders

  • useNativeEncoding

    • 设置为true时,出站消息由 client library 直接序列化,必须相应地进行配置(e.g. 设置适当的 Kafka producer value 序列化程序)。使用此 configuration 时,出站邮件编组不基于 binding 的contentType。当使用本机编码时,consumer 负责使用适当的解码器(例如:Kafka consumer value de-serializer)来反序列化入站消息。此外,当使用本机 encoding/decoding 时,headerMode property 将被忽略,headers 将不会嵌入到消息中。

默认值:false

默认值:false

28.3 使用动态绑定目标

除了通过@EnableBinding定义的 channels 之外,Spring Cloud Stream 允许 applications 将消息发送到动态绑定目的地。例如,当需要在运行时确定目标目标时,这很有用。 Applications 可以使用由@EnableBinding annotation 自动注册的BinderAwareChannelResolver bean 来实现。

property'spring.cloud.stream.dynamicDestinations'可用于将动态目标名称限制为事先已知的集合(白名单)。如果未设置 property,则可以动态绑定任何目标。

BinderAwareChannelResolver可以直接使用,如下面的示例所示,其中 REST 控制器使用路径变量来决定目标 channel。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @PathVariable("target") target,
	       @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, target, contentType);
	}

	private void sendMessage(String body, String target, Object contentType) {
		resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}
}

在默认 port 8080 上启动 application 后,发送以下数据时:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地“客户”和“订单”在 broker 中创建(对于示例:在 Rabbit 情况下为 Exchange,如果是 Kafka 则为 topic),其名称为“customers”和“orders”,数据将发布到相应的目的地。

BinderAwareChannelResolver是一个通用的 Spring Integration DestinationResolver,可以注入其他组件。对于 example,在 router 中使用基于传入 JSON 消息的target字段的 SpEL 表达式。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/", method = POST, consumes = "application/json")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, contentType);
	}

	private void sendMessage(Object body, Object contentType) {
		routerChannel().send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}

	@Bean(name = "routerChannel")
	public MessageChannel routerChannel() {
		return new DirectChannel();
	}

	@Bean
	@ServiceActivator(inputChannel = "routerChannel")
	public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
		router.setDefaultOutputChannelName("default-output");
		router.setChannelResolver(resolver);
		return router;
	}
}