26. 编程模型
本节介绍 Spring Cloud Stream 的编程 model。 Spring Cloud Stream 提供了许多预定义的注释,用于声明绑定的输入和输出 channel 以及如何监听 channels。
26.1 声明和 Binding Channels
26.1.1 通过 @EnableBinding 触发 Binding
您可以通过将@EnableBinding
annotation 应用于 application 的 configuration classes 之一,将 Spring application 转换为 Spring Cloud Stream application。 @EnableBinding
annotation 本身是 meta-annotated 和@Configuration
并触发 Spring Cloud Stream 基础结构的 configuration:
...
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class<?>[] value() default {};
}
@EnableBinding
annotation 可以将一个或多个接口 classes 作为参数,这些接口包含表示可绑定组件的方法(通常是消息 channels)。
@EnableBinding
annotation 仅在Configuration
classes 上需要,您可以根据需要提供尽可能多的 binding 接口,例如:@EnableBinding(value={Orders.class, Payment.class}
。Order
和Payment
接口都声明@Input
和@Output
channels。
26.1.2 @Input 和 @Output
Spring Cloud Stream application 可以在接口中定义任意数量的输入和输出 channels 作为@Input
和@Output
方法:
public interface Barista {
@Input
SubscribableChannel orders();
@Output
MessageChannel hotDrinks();
@Output
MessageChannel coldDrinks();
}
使用此接口作为@EnableBinding
的参数将分别触发创建名为orders
,hotDrinks
和coldDrinks
的三个绑定 channel。
@EnableBinding(Barista.class)
public class CafeConfiguration {
...
}
在 Spring Cloud Stream 中,可绑定的
MessageChannel
组件是 Spring MessagingMessageChannel
(用于出站),其扩展名为SubscribableChannel
(用于入站)。使用相同的机制可以支持其他可绑定组件。 Sp _Cl支持_Cloud _Cafka binder 是一个这样的 example,其中 KStream 用作 inbound/outboundbindable
组件。在本文档中,我们将继续将 MessageChannels 称为bindable
组件。
自定义 Channel 名称
使用@Input
和@Output
注释,您可以为 channel 指定自定义的 channel name,如下面的示例所示:
public interface Barista {
...
@Input("inboundOrders")
SubscribableChannel orders();
}
在此 example 中,创建的绑定 channel 将命名为inboundOrders
。
Source,Sink 和 Processor
为了便于寻址最常见的用例,包括输入 channel,输出 channel 或两者,Spring Cloud Stream 提供了三个预定义的开箱即用接口。
Source
可用于具有单个出站 channel 的 application。
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Sink
可用于具有单个入站 channel 的 application。
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
Processor
可用于具有入站 channel 和出站 channel 的 application。
public interface Processor extends Source, Sink {
}
Spring Cloud Stream 不为任何这些接口提供特殊处理;它们只提供开箱即用。
26.1.3 访问绑定的 Channels
注入绑定接口
对于每个绑定接口,Spring Cloud Stream 将生成一个实现接口的 bean。调用其中一个 beans 的@Input
-annotated 或@Output
-annotated 方法将_return 相关的绑定 channel。
以下 example 中的 bean 在调用其hello
方法时在输出 channel 上发送消息。它在注入的Source
bean 上调用output()
来检索目标 channel。
@Component
public class SendingBean {
private Source source;
@Autowired
public SendingBean(Source source) {
this.source = source;
}
public void sayHello(String name) {
source.output().send(MessageBuilder.withPayload(name).build());
}
}
直接注入 Channels
绑定的 channels 也可以直接注入:
@Component
public class SendingBean {
private MessageChannel output;
@Autowired
public SendingBean(MessageChannel output) {
this.output = output;
}
public void sayHello(String name) {
output.send(MessageBuilder.withPayload(name).build());
}
}
如果在声明 annotation 上自定义 channel 的 name,则应使用 name 而不是方法 name。鉴于以下声明:
public interface CustomSource {
...
@Output("customOutput")
MessageChannel output();
}
channel 将被注入,如下面的示例所示:
@Component
public class SendingBean {
private MessageChannel output;
@Autowired
public SendingBean(@Qualifier("customOutput") MessageChannel output) {
this.output = output;
}
public void sayHello(String name) {
this.output.send(MessageBuilder.withPayload(name).build());
}
}
26.1.4 Producing 和 Consuming Messages
您可以使用 Spring Integration annotations 或 Spring Cloud Stream 的@StreamListener
annotation 编写 Spring Cloud Stream application。 @StreamListener
annotation 建模在其他 Spring Messaging annotations 之后(例如@MessageMapping
,@JmsListener
,@RabbitListener
,etc.),但添加 content type management 并输入强制 features。
Native Spring Integration 支持
因为 Spring Cloud Stream 基于 Spring Integration,所以 Stream 完全继承 Integration 的基础和基础结构以及 component 本身。对于 example,您可以将Source
的输出 channel 附加到MessageSource
:
@EnableBinding(Source.class)
public class TimerSource {
@Value("${format}")
private String format;
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
public MessageSource<String> timerMessageSource() {
return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
}
}
或者您可以在变换器中使用处理器的 channels:
@EnableBinding(Processor.class)
public class TransformProcessor {
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
return message.toUpperCase();
}
}
重要的是要明白当你使用
@StreamListener
使用相同的 binding 时,使用 pubsub model,其中每个使用@StreamListener
注释的方法都会收到它自己的消息副本,每个方法都有自己的 consumer group。但是,如果共享可绑定的 channel 作为@Aggregator
,@Transformer
或@ServiceActivator
的输入,那么这些将在竞争的 model 中使用,不会为每个订阅创建单独的 consumer group。
Spring Integration Error Channel 支持
Spring Cloud Stream 支持发布 Spring Integration global 错误 channel 收到的错误消息。通过为名为error
的出站目标配置 binding,可以将发送到errorChannel
的错误消息发布到 broker 上的特定目标。例如,要将错误消息发布到名为“myErrors”的 broker 目标,请提供以下 property:spring.cloud.stream.bindings.error.destination=myErrors
。
消息 Channel Binder 和错误 Channels
从 version 1.3 开始,一些基于MessageChannel
的 binders 将错误发布到每个目标的离散错误 channel。此外,这些错误 channels 桥接到上面提到的 global Spring Integration errorChannel
。因此,您可以使用标准 Spring Integration 流程(IntegrationFlow
,@ServiceActivator
等)消耗所有目标的特定目标 and/or 的错误。
在 consumer 端,listener 线程捕获任何 exceptions 并将ErrorMessage
转发到目标的错误 channel。消息的有效负载是MessagingException
,具有正常的failedMessage
和cause
properties。通常,从 broker 接收的原始数据包含在标题中。对于支持(并配置)死信目的地的 binders; a MessagePublishingErrorHandler
订阅了 channel,原始数据被转发到死信目的地。
在 producer 方面;对于在发布消息(e.g. RabbitMQ,Kafka)后支持某种异步结果的 binders,可以通过将…producer.errorChannelEnabled
设置为true
来启用错误 channel。 ErrorMessage
的有效负载取决于 binder implementation,但是它将是MessagingException
,具有正常的failedMessage
property,以及关于失败的其他 properties。有关完整的详细信息,请参阅 binder 文档。
使用 @StreamListener 进行自动 Content Type 处理
作为其 Spring Integration 支持的补充,Spring Cloud Stream 提供了自己的@StreamListener
annotation,模仿其他 Spring Messaging annotations(e.g. @MessageMapping
,@JmsListener
,@RabbitListener
,etc.)。@StreamListener
annotation 提供了一个更简单的 model 来处理入站消息,尤其是在处理使用时涉及 content type management 和类型强制的案例。
Spring Cloud Stream 提供了一种可扩展的MessageConverter
机制,用于处理绑定 channels 的数据转换,并且在这种情况下,调度到使用@StreamListener
注释的方法。以下是处理外部Vote
events 的 application 的示例:
@EnableBinding(Sink.class)
public class VoteHandler {
@Autowired
VotingService votingService;
@StreamListener(Sink.INPUT)
public void handle(Vote vote) {
votingService.record(vote);
}
}
在考虑具有String
有效负载和application/json
application/json
头的入站Message
时,可以看到@StreamListener
和 Spring Integration @ServiceActivator
之间的区别。在@StreamListener
的情况下,MessageConverter
机制将使用contentType
头将String
有效负载解析为Vote
object。
与其他 Spring Messaging 方法一样,方法 arguments 可以使用@Payload
,@Headers
和@Header
进行批注。
对于 return 数据的方法,必须使用
@SendTo
annotation 指定方法返回的数据的输出 binding 目标:
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}
使用 @StreamListener 将消息分派给多个方法
由于 version 1.2,Spring Cloud Stream 支持根据条件将消息分派到输入 channel 上注册的多个@StreamListener
方法。
在 order 有资格支持条件分派,方法必须满足以下条件:
-
它不能 return value
-
它必须是单独的消息处理方法(不支持 reactive API 方法)
条件通过 annotation 的condition
属性中的 SpEL 表达式指定,并针对每条消息进行评估。 match 条件的所有处理程序将在同一个线程中调用,并且不必对发生调用的 order 进行任何假设。
使用带有调度条件的@StreamListener
的示例可以在下面看到。在此 example 中,带有带 value foo
的头type
的所有消息将被分派到receiveFoo
方法,带有带 value bar
的头type
的所有消息将被分派到receiveBar
方法。
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")
public void receiveFoo(@Payload FooPojo fooPojo) {
// handle the message
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")
public void receiveBar(@Payload BarPojo barPojo) {
// handle the message
}
}
通过
@StreamListener
条件调度仅支持单个消息的处理程序,而不支持 reactive 编程支持(如下所述)。
26.1.5 Reactive Programming Support
Spring Cloud Stream 还支持使用 reactive API,其中传入和传出数据作为连续数据流进行处理。可以通过spring-cloud-stream-reactive
获得对 reactive API 的支持,需要将其明确添加到您的项目中。
具有 reactive API 的编程 model 是声明性的,您可以使用 operators 来描述从入站数据流到出站数据流的功能转换,而不是指定应如何处理每条消息。
Spring Cloud Stream 支持以下 reactive API:
-
反应堆
-
RxJava 1.x
将来,它旨在支持基于 Reactive Streams 的更通用的 model。
reactive 编程 model 也使用@StreamListener
annotation 来设置 reactive 处理程序。不同之处在于:
-
@StreamListener
annotation 不能指定输入或输出,因为它们是作为方法的 arguments 和 return 值提供的; -
必须使用
@Input
和@Output
注释方法的 arguments,指示传入和分别传入的数据流将连接到哪个输入或输出; -
方法的 return value(如果有的话)将用
@Output
注释,表示数据应被发送的输入。
Reactive 编程支持需要 Java 1.8.
从 Spring Cloud Stream 1.1.1 及更高版本开始(从发布列号 Brooklyn.SR2 开始),reactive 编程支持需要使用 Reactor 3.0.4.RELEASE 和更高版本。不支持早期的 Reactor 版本(包括 3.0.1.RELEASE,3.0.2.RELEASE 和 3.0.3.RELEASE)。
spring-cloud-stream-reactive
将传递性地检索正确的 version,但是项目结构可以将io.projectreactor:reactor-core
的 version 管理到早期版本,尤其是在使用 Maven 时。通过 Spring Initializr 使用 Spring Boot 1.x 生成的项目就是这种情况,它会将 Reactor version 重写为2.0.8.RELEASE
。在这种情况下,您必须确保释放 artifact 的正确 version。这可以通过将与或更高版本的 version 直接依赖添加到项目中来实现。
术语
reactive
的使用目前指的是正在使用的 reactive API 而不是执行 model 是 reactive(i.e.绑定的 endpoints 仍然使用'push'而不是'pull'model)。虽然使用 Reactor 提供了一些背压支持,但我们打算在 long run 上通过对连接的中间件使用 native reactive clients 来支持完全 reactive 管道。
Reactor-based 处理程序
基于 Reactor 的处理程序可以具有以下参数类型:
-
对于使用
@Input
注释的 arguments,它支持 Reactor 类型Flux
。入站 Flux 的参数化遵循与单个消息处理相同的规则:它可以是整个Message
,可以是Message
有效载荷的 POJO,或者是基于Message
content-type 的转换结果的 POJO。头。提供多个输入; -
对于使用
Output
注释的 arguments,它支持类型FluxSender
,它将方法生成的Flux
与输出连接起来。一般来说,只有当方法可以有多个输出时,才建议将输出指定为 arguments;
基于 Reactor 的处理程序支持_ret 类型,其中必须使用@Output
进行注释。当单个输出通量可用时,我们建议使用方法的 return value。
这是一个简单的 Reactor-based 处理器的示例。
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
@Output(Processor.OUTPUT)
public Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
return input.map(s -> s.toUpperCase());
}
}
使用 output arguments 的同一处理器如下所示:
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
public void receive(@Input(Processor.INPUT) Flux<String> input,
@Output(Processor.OUTPUT) FluxSender output) {
output.send(input.map(s -> s.toUpperCase()));
}
}
RxJava 1.x 支持
RxJava 1.x 处理程序遵循与 Reactor-based 一样的规则,但将使用Observable
和ObservableSender
arguments 和 return 类型。
所以上面的第一个例子将成为:
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
@Output(Processor.OUTPUT)
public Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
return input.map(s -> s.toUpperCase());
}
}
上面的第二个例子将成为:
@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {
@StreamListener
public void receive(@Input(Processor.INPUT) Observable<String> input,
@Output(Processor.OUTPUT) ObservableSender output) {
output.send(input.map(s -> s.toUpperCase()));
}
}
Reactive Sources
Spring Cloud Stream reactive 支持还提供了通过 StreamEmitter annotation 创建 reactive 源的能力。使用 StreamEmitter annotation,可以将常规源转换为 reactive 源。 StreamEmitter 是一个方法 level annotation,它将方法标记为通过 EnableBinding 声明的输出的发射器。不允许将 Input annotation 与 StreamEmitter 一起使用,因为使用此 annotation 标记的方法不会从任何输入进行侦听,而是生成到输出。遵循 StreamListener 中使用的相同编程 model,StreamEmitter 还允许使用 Output annotation 的灵活方式,具体取决于方法是否具有 arguments,return 类型等。
以下是使用各种样式的 StreamEmitter 的一些示例。
以下 example 将每毫秒发出“Hello World”消息并发布到 Flux。在这种情况下,Flux 中生成的消息将被发送到 Source 的输出 channel。
@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {
@StreamEmitter
@Output(Source.OUTPUT)
public Flux<String> emit() {
return Flux.intervalMillis(1)
.map(l -> "Hello World");
}
}
以下是与上述相同的另一种样品。此方法使用 FluxSender 以编程方式从源发送 Flux,而不是返回 Flux。
@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {
@StreamEmitter
@Output(Source.OUTPUT)
public void emit(FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World"));
}
}
以下内容与功能和样式上的上述代码段完全相同。但是,它不是在方法 level 上使用显式的 Output annotation,而是用作方法参数 level。
@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {
@StreamEmitter
public void emit(@Output(Source.OUTPUT) FluxSender output) {
output.send(Flux.intervalMillis(1)
.map(l -> "Hello World"));
}
}
下面是使用 Reactive Streams Publisher API 编写反应源的另一种方法,以及Spring Integration Java DSL中对它的支持。发布者仍在使用 Reactor Flux,但从 application 的角度来看,这对用户是透明的,只需要 Reactive Streams 和 Java DSL 来实现 Spring Integration。
@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {
@StreamEmitter
@Output(Source.OUTPUT)
@Bean
public Publisher<Message<String>> emit() {
return IntegrationFlows.from(() ->
new GenericMessage<>("Hello World"),
e -> e.poller(p -> p.fixedDelay(1)))
.toReactivePublisher();
}
}
26.1.6 聚合
Spring Cloud Stream 支持将多个应用程序聚合在一起,直接连接其输入和输出 channels,并避免通过 broker 交换消息的额外成本。从 Spring Cloud Stream 的 version 1.0 开始,仅支持以下类型的 applications 聚合:
-
sources - 具有单个输出的 applications channel 名为
output
,通常具有org.springframework.cloud.stream.messaging.Source
类型的单个 binding -
sinks - 带有名为
input
的单个输入 channel 的 applications,通常只有一个org.springframework.cloud.stream.messaging.Sink
类型的 binding -
处理器 - 具有名为
input
的单个输入 channel 和名为output
的单个输出 channel 的 applications,通常具有org.springframework.cloud.stream.messaging.Processor
类型的单个 binding。
它们可以通过创建一系列互连的 applications 来聚合在一起,其中序列中元素的输出 channel 连接到下一个元素的输入 channel(如果存在)。序列可以从源或处理器开始,它可以包含任意数量的处理器,并且必须以处理器或 sink 结束。
根据起始和结束元素的性质,序列可以具有一个或多个可绑定的 channel,如下所示:
-
如果序列以 source 开头,ends 以 sink 开头,则 applications 之间的所有通信都是直接的,不会绑定 channels
-
如果序列以处理器开始,那么它的输入 channel 将成为聚合的
input
channel 并将相应地绑定 -
如果序列_end 与处理器,则其输出 channel 将成为聚合的
output
channel 并将相应地绑定
使用AggregateApplicationBuilder
实用程序 class 执行聚合,如下面的 example 中所示。让我们考虑一个项目,其中我们有源,处理器和 sink,它们可以在项目中定义,或者可以包含在项目的一个依赖项中。
如果 configuration classes 使用
@SpringBootApplication
,则必须在单独的包中提供聚合 application 中的每个 component(source,sink 或 processor)。由于@SpringBootApplication
在同一个包内的 configuration classes 上执行 classpath 扫描,因此需要避免_app之间的 cross-talk。在下面的 example 中,可以看到 Source,Processor 和 Sink application classes 分组在不同的包中。一种可能的替代方法是在单独的@Configuration
class 中提供 source,sink 或 processor configuration,避免使用@SpringBootApplication
/@ComponentScan
并将其用于聚合。
package com.app.mysink;
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {
private static Logger logger = LoggerFactory.getLogger(SinkApplication.class);
@ServiceActivator(inputChannel=Sink.INPUT)
public void loggerSink(Object payload) {
logger.info("Received: " + payload);
}
}
package com.app.myprocessor;
// Imports omitted
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {
@Transformer
public String loggerSink(String payload) {
return payload.toUpperCase();
}
}
package com.app.mysource;
// Imports omitted
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {
@InboundChannelAdapter(value = Source.OUTPUT)
public String timerMessageSource() {
return new SimpleDateFormat().format(new Date());
}
}
每个 configuration 都可用于 running 一个单独的 component,但在这种情况下,它们可以按如下方式聚合在一起:
package com.app;
// Imports omitted
@SpringBootApplication
public class SampleAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApplication.class).args("--fixedDelay=5000")
.via(ProcessorApplication.class)
.to(SinkApplication.class).args("--debug=true").run(args);
}
}
序列的起始 component 作为from()
方法的参数提供。序列的结束 component 作为to()
方法的参数提供。提供中间处理器作为via()
方法的参数。可以将相同类型的多个处理器链接在一起(e.g. 用于具有不同配置的流水线转换)。对于每个 component,构建器可以为 Spring Boot configuration 提供运行时 arguments。
配置聚合应用程序
Spring Cloud Stream 支持使用“namespace”作为前缀为聚合 application 中的各个 applications 传递 properties。
可以为 applications 设置命名空间,如下所示:
@SpringBootApplication
public class SampleAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApplication.class).namespace("source").args("--fixedDelay=5000")
.via(ProcessorApplication.class).namespace("processor1")
.to(SinkApplication.class).namespace("sink").args("--debug=true").run(args);
}
}
为单个 applications 设置'namespace'后,可以使用任何支持的 property 源(命令行,环境 properties 等)将带有namespace
as 前缀的 application properties 传递给 aggregate application。
例如,要覆盖'source'和'sink'applications 的默认fixedDelay
和debug
properties:
java -jar target/MyAggregateApplication-0.0.1-SNAPSHOT.jar --source.fixedDelay=10000 --sink.debug=false
为非自包含聚合 application 配置 binding service properties
非 self-contained aggregate application 通过聚合 application 的 inbound/outbound 组件(通常是消息 channels)中的一个或两个绑定到外部 broker,而聚合 application 中的 applications 直接绑定。对于 example:源 application 的输出和处理器 application 的输入直接绑定,而处理器的输出 channel 绑定到 broker 的外部目标。传递包含 aggregate application 的 non-self 的 binding 服务 properties 时,需要将 binding 服务 properties 传递给聚合 application,而不是将它们设置为'args'到个别 child application。例如,
@SpringBootApplication
public class SampleAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder()
.from(SourceApplication.class).namespace("source").args("--fixedDelay=5000")
.via(ProcessorApplication.class).namespace("processor1").args("--debug=true").run(args);
}
}
像--spring.cloud.stream.bindings.output.destination=processor-output
这样的 binding properties 需要指定为外部 configuration properties 之一(cmdline arg 等)。