31. Configuration Options
Spring Cloud Stream 支持常规配置选项以及绑定和 Binder 的配置。一些绑定器使其他绑定属性支持特定于中间件的功能。
可以通过 Spring Boot 支持的任何机制将配置选项提供给 Spring Cloud Stream 应用程序。这包括应用程序参数,环境变量以及 YAML 或.properties 文件。
31.1 绑定服务属性
这些属性通过org.springframework.cloud.stream.config.BindingServiceProperties
公开
-
spring.cloud.stream.instanceCount
- 应用程序已部署实例的数量。必须在生产者端进行分区设置。如果使用
autoRebalanceEnabled=false
,则必须在使用 RabbitMQ 和 Kafka 时在用户端设置。
- 应用程序已部署实例的数量。必须在生产者端进行分区设置。如果使用
默认值:1
。
-
spring.cloud.stream.instanceIndex
- 应用程序的实例索引:
0
到instanceCount - 1
之间的数字。用于通过 RabbitMQ 和autoRebalanceEnabled=false
与 Kafka 进行分区。在 Cloud Foundry 中自动设置以匹配应用程序的实例索引。
- 应用程序的实例索引:
-
spring.cloud.stream.dynamicDestinations
- 可以动态绑定的目的地列表(例如,在动态路由方案中)。如果设置,则只能绑定列出的目的地。
默认值:空(将任何目的地绑定)。
-
spring.cloud.stream.defaultBinder
- 如果配置了多个联编程序,则使用的默认联编程序。参见Classpath 上的多个绑定器。
Default: empty.
-
spring.cloud.stream.overrideCloudConnectors
- 仅当
cloud
配置文件处于Active状态并且该应用程序提供了 Spring Cloud Connector 时,此属性才适用。如果属性为false
(默认值),则绑定器检测到合适的绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器绑定的 RabbitMQ 服务)并将其用于创建连接(通常通过 Spring Cloud Connectors)。当设置为true
时,此属性指示绑定程序完全忽略绑定的服务,并依赖于 Spring Boot 属性(例如,依赖于环境中为 RabbitMQ 绑定程序提供的spring.rabbitmq.*
属性)。此属性的典型用法是嵌套在自定义环境连接到多个系统时中。
- 仅当
默认值:false
。
-
spring.cloud.stream.bindingRetryInterval
- 例如,当绑定程序不支持后期绑定并且代理(例如,Apache Kafka)关闭时,重试绑定创建之间的间隔(以秒为单位)。将其设置为零可将此类情况视为致命情况,从而阻止应用程序启动。
默认值:30
31.2 绑定属性
绑定属性通过使用spring.cloud.stream.bindings.<channelName>.<property>=<value>
的格式提供。 <channelName>
代表正在配置的通道的名称(例如,output
表示Source
)。
为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.default.<property>=<value>
的格式为所有通道设置值。
在避免重复使用扩展绑定属性时,应使用此格式-spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>
。
在下面的内容中,我们指出了省略了spring.cloud.stream.bindings.<channelName>.
前缀的位置,仅着重于属性名称,同时要了解在运行时会包含该前缀。
31.2.1 通用绑定属性
这些属性通过org.springframework.cloud.stream.config.BindingProperties
公开
以下绑定属性可用于 Importing 和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.
作为前缀(例如spring.cloud.stream.bindings.input.destination=ticktock
)。
可以使用spring.cloud.stream.default
前缀设置默认值(例如 spring.cloud.stream.default.contentType = application/json)。
-
destination
- 绑定的中间件上的通道的目标位置(例如,RabbitMQ 交换或 Kafka 主题)。如果将通道绑定为使用者,则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔的
String
值。如果未设置,则使用通道名称。此属性的默认值不能被覆盖。
- 绑定的中间件上的通道的目标位置(例如,RabbitMQ 交换或 Kafka 主题)。如果将通道绑定为使用者,则可以将其绑定到多个目标,并且目标名称可以指定为逗号分隔的
-
group
- Channel 的消费群体。仅适用于入站绑定。参见Consumer Groups。
默认值:null
(表示匿名使用者)。
-
contentType
- Channels 的 Content Type。请参阅“ 第 32 章,Content Type 协商”。
默认值:application/json
。
-
binder
- 此绑定使用的粘合剂。有关详细信息,请参见“ 第 30.4 节“Classpath 上的多个绑定器””。
默认值:null
(使用默认的 Binder,如果存在的话)。
31.2.2Consumer 属性
这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties
公开
以下绑定属性仅可用于 Importing 绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.
作为前缀(例如spring.cloud.stream.bindings.input.consumer.concurrency=3
)。
可以使用spring.cloud.stream.default.consumer
前缀(例如spring.cloud.stream.default.consumer.headerMode=none
)设置默认值。
-
concurrency
- 入站使用者的并发。
默认值:1
。
-
partitioned
- Consumer 是否从分区生产者那里接收数据。
默认值:false
。
-
headerMode
- 设置为
none
时,将禁用 Importing 的 Headers 解析。仅对不支持本地消息头并且需要消息头嵌入的消息中间件有效。当不支持本机头时,使用非 Spring Cloud Stream 应用程序中的数据时,此选项很有用。设置为headers
时,它将使用中间件的本机头机制。设置为embeddedHeaders
时,它将 Headers 嵌入到消息有效负载中。
- 设置为
默认值:取决于 Binder 的实现。
-
maxAttempts
- 如果处理失败,则尝试处理消息的次数(包括第一个)。设置为
1
可禁用重试。
- 如果处理失败,则尝试处理消息的次数(包括第一个)。设置为
默认值:3
。
-
backOffInitialInterval
- 重试时的退避初始间隔。
默认值:1000
。
-
backOffMaxInterval
- 最大退避间隔。
默认值:10000
。
-
backOffMultiplier
- 退避乘数。
默认值:2.0
。
-
defaultRetryable
retryableExceptions
中未列出的侦听器引发的异常是否可以重试。
默认值:true
。
-
instanceIndex
- 设置为大于零的值时,它允许自定义此使用者的实例索引(如果与
spring.cloud.stream.instanceIndex
不同)。设置为负值时,默认为spring.cloud.stream.instanceIndex
。有关更多信息,请参见“ 第 34.2 节“实例索引和实例计数””。
- 设置为大于零的值时,它允许自定义此使用者的实例索引(如果与
默认值:-1
。
-
instanceCount
- 设置为大于零的值时,它允许自定义此使用者的实例计数(如果与
spring.cloud.stream.instanceCount
不同)。设置为负值时,默认为spring.cloud.stream.instanceCount
。有关更多信息,请参见“ 第 34.2 节“实例索引和实例计数””。
- 设置为大于零的值时,它允许自定义此使用者的实例计数(如果与
默认值:-1
。
-
retryableExceptions
- 键中 Throwable 类名称的 Map,值中布尔值的 Map。指定将要重试的那些异常(和子类)。另请参见
defaultRetriable
。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false
。
- 键中 Throwable 类名称的 Map,值中布尔值的 Map。指定将要重试的那些异常(和子类)。另请参见
Default: empty.
-
useNativeDecoding
- 当设置为
true
时,Client 端库将直接对入站消息进行反序列化,该库必须进行相应配置(例如,设置适当的 Kafka 生产者值反序列化器)。使用此配置时,入站邮件解组不是基于绑定的contentType
。使用本机解码时,生产者负责使用适当的编码器(例如,Kafka 生产者值序列化程序)对出站消息进行序列化。另外,使用本机编码和解码时,headerMode=embeddedHeaders
属性将被忽略,并且 Headers 不会嵌入消息中。参见生产者属性useNativeEncoding
。
- 当设置为
默认值:false
。
31.2.3 生产者属性
这些属性通过org.springframework.cloud.stream.binder.ProducerProperties
公开
以下绑定属性仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.
作为前缀(例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id
)。
可以使用前缀spring.cloud.stream.default.producer
设置默认值(例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
)。
-
partitionKeyExpression
- 一个 SpEL 表达式,该表达式确定如何对出站数据进行分区。如果已设置,或者设置了
partitionKeyExtractorClass
,则会对该通道上的出站数据进行分区。partitionCount
必须设置为大于 1 的值才能生效。与partitionKeyExtractorClass
互斥。请参阅“ 第 28.6 节“分区支持””。
- 一个 SpEL 表达式,该表达式确定如何对出站数据进行分区。如果已设置,或者设置了
Default: null.
-
partitionKeyExtractorClass
PartitionKeyExtractorStrategy
实现。如果已设置,或者已设置partitionKeyExpression
,则会对该通道上的出站数据进行分区。partitionCount
必须设置为大于 1 的值才能生效。与partitionKeyExpression
互斥。请参阅“ 第 28.6 节“分区支持””。
默认值:null
。
-
partitionSelectorClass
PartitionSelectorStrategy
实现。与partitionSelectorExpression
互斥。如果两者均未设置,则将分区选择为hashCode(key) % partitionCount
,其中key
通过partitionKeyExpression
或partitionKeyExtractorClass
计算。
默认值:null
。
-
partitionSelectorExpression
- 用于自定义分区选择的 SpEL 表达式。与
partitionSelectorClass
互斥。如果两者均未设置,则将分区选择为hashCode(key) % partitionCount
,其中key
通过partitionKeyExpression
或partitionKeyExtractorClass
计算。
- 用于自定义分区选择的 SpEL 表达式。与
默认值:null
。
-
partitionCount
- 数据的目标分区数(如果启用了分区)。如果生产者已分区,则必须将其设置为大于 1 的值。在 Kafka 上,它被解释为提示。取其较大者,并使用目标主题的分区数。
默认值:1
。
-
requiredGroups
- 生产者必须确保将消息传递到的组的逗号分隔列表,即使它们是在创建消息之后开始的(例如,通过在 RabbitMQ 中预先创建持久队列)。
-
headerMode
- 设置为
none
时,它将禁用在输出中嵌入 Headers。它仅对本身不支持消息头并且需要消息头嵌入的消息中间件有效。当不支持本机头时,在为非 Spring Cloud Stream 应用程序生成数据时,此选项很有用。设置为headers
时,它将使用中间件的本机头机制。设置为embeddedHeaders
时,它将 Headers 嵌入到消息有效负载中。
- 设置为
默认值:取决于 Binder 的实现。
-
useNativeEncoding
- 当设置为
true
时,出站消息将直接由 Client 端库进行序列化,Client 端库必须进行相应配置(例如,设置适当的 Kafka 生产者值序列化程序)。使用此配置时,出站邮件编组不是基于绑定的contentType
。使用本机编码时,Consumer 有责任使用适当的解码器(例如,KafkaConsumer 值反序列化器)对入站消息进行反序列化。另外,使用本机编码和解码时,headerMode=embeddedHeaders
属性将被忽略,并且 Headers 不会嵌入消息中。参见 Consumer 属性useNativeDecoding
。
- 当设置为
默认值:false
。
-
errorChannelEnabled
- 设置为
true
时,如果 Binder 支持异步发送结果,则发送失败将发送到目标的错误通道。有关更多信息,请参见“ ???”。
- 设置为
默认值:false
。
31.3 使用动态绑定的目的地
除了使用@EnableBinding
定义的通道外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目的地。例如,当需要在运行时确定目标目的地时,这很有用。应用程序可以使用由@EnableBinding
注解自动注册的BinderAwareChannelResolver
bean 来实现。
“ spring.cloud.stream.dynamicDestinations”属性可用于将动态目标名称限制为已知集合(白名单)。如果未设置此属性,则可以动态绑定任何目标。
BinderAwareChannelResolver
可以直接使用,如以下使用路径变量来确定目标通道的 REST 控制器示例所示:
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @PathVariable("target") target,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, target, contentType);
}
private void sendMessage(String body, String target, Object contentType) {
resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
}
现在考虑当我们在默认端口(8080)上启动应用程序并使用 CURL 发出以下请求时会发生什么:
curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers
curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders
在代理中创建目的地(“Client”和“订单”)(在 Rabbit 的交换中或在 Kafka 的主题中),名称为“Client”和“订单”,并将数据发布到适当的目的地。
BinderAwareChannelResolver
是通用的 Spring Integration DestinationResolver
,并且可以注入到其他组件中,例如,在 Router 中使用 SpEL 表达式,该表达式基于传入 JSON 消息的target
字段。以下示例包含一个读取 SpEL 表达式的 Router:
@EnableBinding
@Controller
public class SourceWithDynamicDestination {
@Autowired
private BinderAwareChannelResolver resolver;
@RequestMapping(path = "/", method = POST, consumes = "application/json")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
sendMessage(body, contentType);
}
private void sendMessage(Object body, Object contentType) {
routerChannel().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
@Bean(name = "routerChannel")
public MessageChannel routerChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "routerChannel")
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router =
new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
router.setDefaultOutputChannelName("default-output");
router.setChannelResolver(resolver);
return router;
}
}
Router 接收器应用使用此技术按需创建目的地。
如果预先知道通道名称,则可以像其他任何目的地一样配置生产者属性。或者,如果您注册NewBindingCallback<>
bean,则在创建绑定之前调用它。回调采用绑定程序使用的扩展生产者属性的通用类型。它有一种方法:
void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
以下示例显示了如何使用 RabbitMQ 绑定器:
@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
Note
如果需要支持具有多种 Binder 类型的动态目标,请对通用类型使用Object
并根据需要强制转换extended
参数。