28. Configuration 选项
Spring Cloud Stream 支持常规 configuration 选项以及绑定和 binders 的 configuration。一些 binders 允许额外的 binding properties 来支持 middleware-specific features。
Configuration 选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream applications。这包括 application arguments,环境变量和 YAML 或.properties files。
28.1 Spring Cloud Stream Properties
-
spring.cloud.stream.instanceCount
- application 的已部署实例数。必须设置为分区,如果使用 Kafka。
默认值:1
。
-
spring.cloud.stream.instanceIndex
- application 的实例索引:从
0
到instanceCount
-1 的数字。用于分区和 Kafka。自动在 Cloud Foundry 中设置匹配 application 的实例索引。
- application 的实例索引:从
-
spring.cloud.stream.dynamicDestinations
- 可动态绑定的目标列表(例如,在动态路由方案中)。如果设置,则只能绑定列出的目标。
默认值:空(允许绑定任何目标)。
-
spring.cloud.stream.defaultBinder
- 如果配置了多个 binder,则使用默认的 binder。见Classpath 上有多个 Binder。
默认值:空。
-
spring.cloud.stream.overrideCloudConnectors
- 此 property 仅适用于
cloud
profile 为 active 且 Spring Cloud 连接器随 application 一起提供的情况。如果 property 是 false(默认值),binder 将检测一个合适的绑定服务(e.g. 为 RabbitMQ binder 绑定在 Cloud Foundry 中的 RabbitMQ 服务)并将其用于 creating 连接(通常通过 Spring Cloud 连接器)。当设置为 true 时,此 property 指示 binders 完全忽略绑定服务并依赖 Spring Boot properties(e.g. 依赖于 RabbitMQ binder 环境中提供的spring.rabbitmq.*
properties)。此 property 的典型用法是嵌套在自定义环境当连接到多个系统时中。
- 此 property 仅适用于
默认值:false。
28.2 Binding Properties
Binding properties 使用spring.cloud.stream.bindings.<channelName>.<property>=<value>
格式提供。 <channelName>
表示正在配置的 channel 的 name(e.g. ,output
表示Source
)。
为避免重复,Spring Cloud Stream 支持所有 channel 的设置值,格式为spring.cloud.stream.default.<property>=<value>
。
在下文中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.
前缀并仅关注 property name,并理解前缀将包含在运行时。
28.2.1 使用 Spring Cloud Stream 的属性
以下 binding properties 可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.
,e.g 为前缀。 spring.cloud.stream.bindings.input.destination=ticktock
。
可以使用前缀spring.cloud.stream.default
,e.g 设置默认值。 spring.cloud.stream.default.contentType=application/json
。
-
目的地
- 绑定中间件(e.g. ,RabbitMQ 交换或 Kafka topic)上 channel 的目标目标。如果 channel 绑定为 consumer,则它可以绑定到多个目标,并且可以将目标名称指定为逗号分隔的 String 值。如果未设置,则使用 channel name。无法覆盖此 property 的默认 value。
-
组
- channel 的 consumer group。仅适用于入站绑定。见Consumer Groups。
默认值:null(表示匿名 consumer)。
-
内容类型
- channel 的 content type。
默认值:null(这样就不会执行类型强制)。
-
binder
- binding 使用的 binder。有关详细信息,请参阅第 27.4 节,“Classpath 上的多个 Binder”。
默认值:null(将使用默认的 binder,如果存在的话)。
28.2.2 Consumer properties
以下 binding properties 仅可用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.
,e.g 为前缀。 spring.cloud.stream.bindings.input.consumer.concurrency=3
。
可以使用前缀spring.cloud.stream.default.consumer
,e.g 设置默认值。 spring.cloud.stream.default.consumer.headerMode=raw
。
-
并发
- 入站 consumer 的并发性。
默认值:1
。
-
分区
- consumer 是否从分区的 producer 接收数据。
默认值:false
。
-
headerMode
- 设置为
raw
时,禁用输入上的标头解析。仅适用于本机不支持消息 headers 且需要标头嵌入的消息传递中间件。入站数据来自外部 Spring Cloud Stream applications 时很有用。
- 设置为
默认值:embeddedHeaders
。
-
maxAttempts
- 如果处理失败,则尝试处理消息(包括第一个)。设置为 1 以禁用重试。
默认值:3
。
-
backOffInitialInterval
- 重试时的退避初始间隔。
默认值:1000
。
-
backOffMaxInterval
- 最大退避间隔。
默认值:10000
。
-
backOffMultiplier
- 退避乘数。
默认值:2.0
。
-
instanceIndex
- 当设置为大于等于零的 value 时,允许自定义此 consumer 的实例索引(如果与
spring.cloud.stream.instanceIndex
不同)。设置为负 value 时,默认为spring.cloud.stream.instanceIndex
。
- 当设置为大于等于零的 value 时,允许自定义此 consumer 的实例索引(如果与
默认值:-1
。
-
instanceCount
- 当设置为大于等于零的 value 时,允许自定义此 consumer 的实例计数(如果与
spring.cloud.stream.instanceCount
不同)。设置为负 value 时,默认为spring.cloud.stream.instanceCount
。
- 当设置为大于等于零的 value 时,允许自定义此 consumer 的实例计数(如果与
默认值:-1
。
28.2.3 Producer Properties
以下 binding properties 仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.
,e.g 为前缀。 spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
。
可以使用前缀spring.cloud.stream.default.producer
,e.g 设置默认值。 spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
。
-
partitionKeyExpression
- 一个 SpEL 表达式,用于确定如何对出站数据进行分区。如果设置,或者设置了
partitionKeyExtractorClass
,则此 channel 上的出站数据将被分区,并且partitionCount
必须设置为大于 1 的 value 才能生效。这两个选项是互斥的。见第 25.5 节,“分区支持”。
- 一个 SpEL 表达式,用于确定如何对出站数据进行分区。如果设置,或者设置了
默认值:null。
-
partitionKeyExtractorClass
- 一个
PartitionKeyExtractorStrategy
实现。如果设置,或者设置了partitionKeyExpression
,则此 channel 上的出站数据将被分区,并且partitionCount
必须设置为大于 1 的 value 才能生效。这两个选项是互斥的。见第 25.5 节,“分区支持”。
- 一个
默认值:null。
-
partitionSelectorClass
- 一个
PartitionSelectorStrategy
实现。与partitionSelectorExpression
互斥。如果两者都未设置,则分区将被选为hashCode(key) % partitionCount
,其中key
通过partitionKeyExpression
或partitionKeyExtractorClass
计算。
- 一个
默认值:null。
-
partitionSelectorExpression
- 用于自定义分区选择的 SpEL 表达式。与
partitionSelectorClass
互斥。如果两者都未设置,则分区将被选为hashCode(key) % partitionCount
,其中key
通过partitionKeyExpression
或partitionKeyExtractorClass
计算。
- 用于自定义分区选择的 SpEL 表达式。与
默认值:null。
-
partitionCount
- 如果启用了分区,则数据的目标分区数。如果 producer 被分区,则必须设置为大于 1 的 value。在 Kafka 上,被解释为暗示;更大的这个和目标 topic 的分区数用来代替。
默认值:1
。
-
requiredGroups
-
headerMode
- 设置为
raw
时,禁用输出上的标头嵌入。仅适用于本机不支持消息 headers 且需要标头嵌入的消息传递中间件。 生成 non-Spring Cloud Stream applications 的数据时很有用。
- 设置为
默认值:embeddedHeaders
。
-
useNativeEncoding
- 设置为
true
时,出站消息由 client library 直接序列化,必须相应地进行配置(e.g. 设置适当的 Kafka producer value 序列化程序)。使用此 configuration 时,出站邮件编组不基于 binding 的contentType
。当使用本机编码时,consumer 负责使用适当的解码器(例如:Kafka consumer value de-serializer)来反序列化入站消息。此外,当使用本机 encoding/decoding 时,headerMode
property 将被忽略,headers 将不会嵌入到消息中。
- 设置为
默认值:false
。
-
errorChannelEnabled
- 设置为
true
时,如果 binder 支持异步发送结果;发送失败将发送到目标的错误 channel。有关更多信息,请参见名为“Message Channel Binder and Error Channels”的部分。
- 设置为
默认值:false
。
28.3 使用动态绑定目标
除了通过@EnableBinding
定义的 channels 之外,Spring Cloud Stream 允许 applications 将消息发送到动态绑定目的地。例如,当需要在运行时确定目标目标时,这很有用。 Applications 可以使用由@EnableBinding
annotation 自动注册的BinderAwareChannelResolver
bean 来实现。
property'spring.cloud.stream.dynamicDestinations'可用于将动态目标名称限制为事先已知的集合(白名单)。如果未设置 property,则可以动态绑定任何目标。
BinderAwareChannelResolver
可以直接使用,如下面的示例所示,其中 REST 控制器使用路径变量来决定目标 channel。
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
在默认 port 8080 上启动 application 后,发送以下数据时:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers
curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
目的地“客户”和“订单”在 broker 中创建(对于示例:在 Rabbit 情况下为 Exchange,如果是 Kafka 则为 topic),其名称为“customers”和“orders”,数据将发布到相应的目的地。
BinderAwareChannelResolver
是一个通用的 Spring Integration DestinationResolver
,可以注入其他组件。对于 example,在 router 中使用基于传入 JSON 消息的target
字段的 SpEL 表达式。
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/", method = POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "routerChannel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "routerChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router =
new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}