31. Configuration Options

Spring Cloud Stream 支持常规配置选项以及绑定和 Binder 的配置。一些绑定器使其他绑定属性支持特定于中间件的功能。

可以通过 Spring Boot 支持的任何机制将配置选项提供给 Spring Cloud Stream 应用程序。这包括应用程序参数,环境变量以及 YAML 或.properties 文件。

31.1 绑定服务属性

这些属性通过org.springframework.cloud.stream.config.BindingServiceProperties公开

默认值:1

默认值:空(将任何目的地绑定)。

Default: empty.

默认值:false

默认值:30

31.2 绑定属性

绑定属性通过使用spring.cloud.stream.bindings.<channelName>.<property>=<value>的格式提供。 <channelName>代表正在配置的通道的名称(例如,output表示Source)。

为了避免重复,Spring Cloud Stream 支持以spring.cloud.stream.default.<property>=<value>的格式为所有通道设置值。

在避免重复使用扩展绑定属性时,应使用此格式-spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

在下面的内容中,我们指出了省略了spring.cloud.stream.bindings.<channelName>.前缀的位置,仅着重于属性名称,同时要了解在运行时会包含该前缀。

31.2.1 通用绑定属性

这些属性通过org.springframework.cloud.stream.config.BindingProperties公开

以下绑定属性可用于 Importing 和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.作为前缀(例如spring.cloud.stream.bindings.input.destination=ticktock)。

可以使用spring.cloud.stream.default前缀设置默认值(例如 spring.cloud.stream.default.contentType = application/json)。

默认值:null(表示匿名使用者)。

默认值:application/json

默认值:null(使用默认的 Binder,如果存在的话)。

31.2.2Consumer 属性

这些属性通过org.springframework.cloud.stream.binder.ConsumerProperties公开

以下绑定属性仅可用于 Importing 绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.作为前缀(例如spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以使用spring.cloud.stream.default.consumer前缀(例如spring.cloud.stream.default.consumer.headerMode=none)设置默认值。

默认值:1

默认值:false

默认值:取决于 Binder 的实现。

默认值:3

默认值:1000

默认值:10000

默认值:2.0

默认值:true

默认值:-1

默认值:-1

Default: empty.

默认值:false

31.2.3 生产者属性

这些属性通过org.springframework.cloud.stream.binder.ProducerProperties公开

以下绑定属性仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.作为前缀(例如spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id)。

可以使用前缀spring.cloud.stream.default.producer设置默认值(例如spring.cloud.stream.default.producer.partitionKeyExpression=payload.id)。

Default: null.

默认值:null

默认值:null

默认值:null

默认值:1

默认值:取决于 Binder 的实现。

默认值:false

默认值:false

31.3 使用动态绑定的目的地

除了使用@EnableBinding定义的通道外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目的地。例如,当需要在运行时确定目标目的地时,这很有用。应用程序可以使用由@EnableBinding注解自动注册的BinderAwareChannelResolver bean 来实现。

“ spring.cloud.stream.dynamicDestinations”属性可用于将动态目标名称限制为已知集合(白名单)。如果未设置此属性,则可以动态绑定任何目标。

BinderAwareChannelResolver可以直接使用,如以下使用路径变量来确定目标通道的 REST 控制器示例所示:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @PathVariable("target") target,
           @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, target, contentType);
    }

    private void sendMessage(String body, String target, Object contentType) {
        resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }
}

现在考虑当我们在默认端口(8080)上启动应用程序并使用 CURL 发出以下请求时会发生什么:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

在代理中创建目的地(“Client”和“订单”)(在 Rabbit 的交换中或在 Kafka 的主题中),名称为“Client”和“订单”,并将数据发布到适当的目的地。

BinderAwareChannelResolver是通用的 Spring Integration DestinationResolver,并且可以注入到其他组件中,例如,在 Router 中使用 SpEL 表达式,该表达式基于传入 JSON 消息的target字段。以下示例包含一个读取 SpEL 表达式的 Router:

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @RequestMapping(path = "/", method = POST, consumes = "application/json")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
        sendMessage(body, contentType);
    }

    private void sendMessage(Object body, Object contentType) {
        routerChannel().send(MessageBuilder.createMessage(body,
                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
    }

    @Bean(name = "routerChannel")
    public MessageChannel routerChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "routerChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }
}

Router 接收器应用使用此技术按需创建目的地。

如果预先知道通道名称,则可以像其他任何目的地一样配置生产者属性。或者,如果您注册NewBindingCallback<> bean,则在创建绑定之前调用它。回调采用绑定程序使用的扩展生产者属性的通用类型。它有一种方法:

void configure(String channelName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例显示了如何使用 RabbitMQ 绑定器:

@Bean
public NewBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}

Note

如果需要支持具有多种 Binder 类型的动态目标,请对通用类型使用Object并根据需要强制转换extended参数。

首页