29. Configuration 选项

Spring Cloud Stream 支持常规 configuration 选项以及绑定和 binders 的 configuration。一些 binders 允许额外的 binding properties 支持 middleware-specific features。

Configuration 选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream applications。这包括 application arguments,环境变量和 YAML 或.properties files。

29.1 Binding Service Properties

这些 properties 通过org.springframework.cloud.stream.config.BindingServiceProperties公开

默认值:1

默认值:空(允许绑定任何目标)。

默认值:空。

默认值:false

默认值:30

29.2 Binding Properties

Binding properties 使用spring.cloud.stream.bindings.<channelName>.<property>=<value>的格式提供。 <channelName>表示正在配置的 channel 的 name(对于 example,output表示Source)。

为避免重复,Spring Cloud Stream 支持所有 channel 的设置值,格式为spring.cloud.stream.default.<property>=<value>

在下文中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.前缀并仅关注 property name,并理解前缀 ise 包含在运行时。

29.2.1 Common Binding Properties

这些 properties 通过org.springframework.cloud.stream.config.BindingProperties公开

以下 binding properties 可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.为前缀(对于 example,spring.cloud.stream.bindings.input.destination=ticktock)。

可以使用spring.cloud.stream.default前缀(对于 example spring.cloud.stream.default.contentType=application/json)设置默认值。

默认值:null(表示匿名 consumer)。

默认值:null(不执行类型强制)。

默认值:null(使用默认的 binder,如果存在)。

29.2.2 Consumer Properties

这些 properties 通过org.springframework.cloud.stream.binder.ConsumerProperties公开

以下 binding properties 仅可用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.为前缀(对于 example,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以使用spring.cloud.stream.default.consumer前缀设置默认值(对于 example,spring.cloud.stream.default.consumer.headerMode=none)。

默认值:1

默认值:false

默认值:取决于 binder implementation。

默认值:3

默认值:1000

默认值:10000

默认值:2.0

默认值:-1

默认值:-1

默认值:false

29.2.3 Producer Properties

这些 properties 通过org.springframework.cloud.stream.binder.ProducerProperties公开

以下 binding properties 仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.为前缀(对于 example,spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id)。

可以使用前缀spring.cloud.stream.default.producer(对于 example,spring.cloud.stream.default.producer.partitionKeyExpression=payload.id)设置默认值。

默认值:null。

默认值:null

默认值:null

默认值:null

默认值:1

默认值:取决于 binder implementation。

默认值:false

默认值:false

29.3 使用动态绑定目标

除了使用@EnableBinding定义的 channel 之外,Spring Cloud Stream 允许 applications 将消息发送到动态绑定的目标。例如,当需要在运行时确定目标目标时,这很有用。 Applications 可以使用由@EnableBinding annotation 自动注册的BinderAwareChannelResolver bean 来实现。

'spring.cloud.stream.dynamicDestinations'property 可用于将动态目标名称限制为已知集(白名单)。如果未设置此 property,则可以动态绑定任何目标。

BinderAwareChannelResolver可以直接使用,如下面的 REST 控制器的示例所示,使用路径变量来决定目标 channel:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

现在考虑当我们在默认 port(8080)上启动 application 并使用 CURL 发出以下请求时会发生什么:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地,“客户”和“订单”在 broker(在 Rabbit 的交换中或在 Kafka 的 topic 中)创建,名称为“customers”和“orders”,数据将发布到相应的目的地。

BinderAwareChannelResolver是一个 general-purpose Spring Integration DestinationResolver,可以在其他组件中注入 - 例如,在 router 中使用基于传入 JSON 消息的target字段的 SpEL 表达式。以下 example 包含一个读取 SpEL 表达式的 router:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/", method = POST, consumes = "application/json")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, contentType);
    }

    private void sendMessage(Object body, Object contentType) {
        routerChannel().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

    @Bean(name = "routerChannel")
    public MessageChannel routerChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "routerChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }
}

Router Sink Application使用此技术创建目标 on-demand。

如果事先知道 channel 名称,则可以将 producer properties 配置为与任何其他目标一样。或者,如果注册NewBindingCallback<> bean,则会在创建 binding 之前调用它。回调采用 binder 使用的扩展 producer properties 的泛型类型。它有一种方法:

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下 example 显示了如何使用 RabbitMQ binder:

@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

如果需要支持具有多个 binder 类型的动态目标,请使用Object作为泛型类型,并根据需要强制转换extended参数。

首页