28. Configuration 选项

Spring Cloud Stream 支持常规 configuration 选项以及绑定和 binders 的 configuration。一些 binders 允许额外的 binding properties 来支持 middleware-specific features。

Configuration 选项可以通过 Spring Boot 支持的任何机制提供给 Spring Cloud Stream applications。这包括 application arguments,环境变量和 YAML 或.properties files。

28.1 Spring Cloud Stream Properties

默认值:1

默认值:空(允许绑定任何目标)。

默认值:空。

默认值:false。

28.2 Binding Properties

Binding properties 使用spring.cloud.stream.bindings.<channelName>.<property>=<value>格式提供。 <channelName>表示正在配置的 channel 的 name(e.g. ,output表示Source)。

为避免重复,Spring Cloud Stream 支持所有 channel 的设置值,格式为spring.cloud.stream.default.<property>=<value>

在下文中,我们指出我们在哪里省略了spring.cloud.stream.bindings.<channelName>.前缀并仅关注 property name,并理解前缀将包含在运行时。

28.2.1 使用 Spring Cloud Stream 的属性

以下 binding properties 可用于输入和输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.,e.g 为前缀。 spring.cloud.stream.bindings.input.destination=ticktock

可以使用前缀spring.cloud.stream.default,e.g 设置默认值。 spring.cloud.stream.default.contentType=application/json

默认值:null(表示匿名 consumer)。

默认值:null(这样就不会执行类型强制)。

默认值:null(将使用默认的 binder,如果存在的话)。

28.2.2 Consumer properties

以下 binding properties 仅可用于输入绑定,并且必须以spring.cloud.stream.bindings.<channelName>.consumer.,e.g 为前缀。 spring.cloud.stream.bindings.input.consumer.concurrency=3

可以使用前缀spring.cloud.stream.default.consumer,e.g 设置默认值。 spring.cloud.stream.default.consumer.headerMode=raw

默认值:1

默认值:false

默认值:embeddedHeaders

默认值:3

默认值:1000

默认值:10000

默认值:2.0

默认值:-1

默认值:-1

28.2.3 Producer Properties

以下 binding properties 仅可用于输出绑定,并且必须以spring.cloud.stream.bindings.<channelName>.producer.,e.g 为前缀。 spring.cloud.stream.bindings.input.producer.partitionKeyExpression=payload.id

可以使用前缀spring.cloud.stream.default.producer,e.g 设置默认值。 spring.cloud.stream.default.producer.partitionKeyExpression=payload.id

默认值:null。

默认值:null。

默认值:null。

默认值:null。

默认值:1

默认值:embeddedHeaders

默认值:false

默认值:false

28.3 使用动态绑定目标

除了通过@EnableBinding定义的 channels 之外,Spring Cloud Stream 允许 applications 将消息发送到动态绑定目的地。例如,当需要在运行时确定目标目标时,这很有用。 Applications 可以使用由@EnableBinding annotation 自动注册的BinderAwareChannelResolver bean 来实现。

property'spring.cloud.stream.dynamicDestinations'可用于将动态目标名称限制为事先已知的集合(白名单)。如果未设置 property,则可以动态绑定任何目标。

BinderAwareChannelResolver可以直接使用,如下面的示例所示,其中 REST 控制器使用路径变量来决定目标 channel。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/{target}", method = POST, consumes = "*/*")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @PathVariable("target") target,
	       @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, target, contentType);
	}

	private void sendMessage(String body, String target, Object contentType) {
		resolver.resolveDestination(target).send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}
}

在默认 port 8080 上启动 application 后,发送以下数据时:

curl -H "Content-Type: application/json" -X POST -d "customer-1" http://localhost:8080/customers

curl -H "Content-Type: application/json" -X POST -d "order-1" http://localhost:8080/orders

目的地“客户”和“订单”在 broker 中创建(对于示例:在 Rabbit 情况下为 Exchange,如果是 Kafka 则为 topic),其名称为“customers”和“orders”,数据将发布到相应的目的地。

BinderAwareChannelResolver是一个通用的 Spring Integration DestinationResolver,可以注入其他组件。对于 example,在 router 中使用基于传入 JSON 消息的target字段的 SpEL 表达式。

@EnableBinding
@Controller
public class SourceWithDynamicDestination {

	@Autowired
	private BinderAwareChannelResolver resolver;

	@RequestMapping(path = "/", method = POST, consumes = "application/json")
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
		sendMessage(body, contentType);
	}

	private void sendMessage(Object body, Object contentType) {
		routerChannel().send(MessageBuilder.createMessage(body,
				new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
	}

	@Bean(name = "routerChannel")
	public MessageChannel routerChannel() {
		return new DirectChannel();
	}

	@Bean
	@ServiceActivator(inputChannel = "routerChannel")
	public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router =
            new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.target"));
		router.setDefaultOutputChannelName("default-output");
		router.setChannelResolver(resolver);
		return router;
	}
}
首页