26. 编程模型

本节介绍 Spring Cloud Stream 的编程 model。 Spring Cloud Stream 提供了许多预定义的注释,用于声明绑定的输入和输出 channel 以及如何监听 channels。

26.1 声明和 Binding Channels

26.1.1 通过 @EnableBinding 触发 Binding

您可以通过将@EnableBinding annotation 应用于 application 的 configuration classes 之一,将 Spring application 转换为 Spring Cloud Stream application。 @EnableBinding annotation 本身是 meta-annotated 和@Configuration并触发 Spring Cloud Stream 基础结构的 configuration:

...
@Import(...)
@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}

@EnableBinding annotation 可以将一个或多个接口 classes 作为参数,这些接口包含表示可绑定组件的方法(通常是消息 channels)。

@EnableBinding annotation 仅在Configuration classes 上需要,您可以根据需要提供尽可能多的 binding 接口,例如:@EnableBinding(value={Orders.class, Payment.class}OrderPayment接口都声明@Input@Output channels。

26.1.2 @Input 和 @Output

Spring Cloud Stream application 可以在接口中定义任意数量的输入和输出 channels 作为@Input@Output方法:

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

使用此接口作为@EnableBinding的参数将分别触发创建名为ordershotDrinkscoldDrinks的三个绑定 channel。

@EnableBinding(Barista.class)
public class CafeConfiguration {

   ...
}

在 Spring Cloud Stream 中,可绑定的MessageChannel组件是 Spring Messaging MessageChannel(用于出站),其扩展名为SubscribableChannel(用于入站)。使用相同的机制可以支持其他可绑定组件。 Sp _Cl支持_Cloud _Cafka binder 是一个这样的 example,其中 KStream 用作 inbound/outbound bindable组件。在本文档中,我们将继续将 MessageChannels 称为bindable组件。

自定义 Channel 名称

使用@Input@Output 注释,您可以为 channel 指定自定义的 channel name,如下面的示例所示:

public interface Barista {
    ...
    @Input("inboundOrders")
    SubscribableChannel orders();
}

在此 example 中,创建的绑定 channel 将命名为inboundOrders

Source,Sink 和 Processor

为了便于寻址最常见的用例,包括输入 channel,输出 channel 或两者,Spring Cloud Stream 提供了三个预定义的开箱即用接口。

Source可用于具有单个出站 channel 的 application。

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}

Sink可用于具有单个入站 channel 的 application。

public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();

}

Processor可用于具有入站 channel 和出站 channel 的 application。

public interface Processor extends Source, Sink {
}

Spring Cloud Stream 不为任何这些接口提供特殊处理;它们只提供开箱即用。

26.1.3 访问绑定的 Channels

注入绑定接口

对于每个绑定接口,Spring Cloud Stream 将生成一个实现接口的 bean。调用其中一个 beans 的@Input -annotated 或@Output -annotated 方法将_return 相关的绑定 channel。

以下 example 中的 bean 在调用其hello方法时在输出 channel 上发送消息。它在注入的Source bean 上调用output()来检索目标 channel。

@Component
public class SendingBean {

    private Source source;

    @Autowired
    public SendingBean(Source source) {
        this.source = source;
    }

    public void sayHello(String name) {
         source.output().send(MessageBuilder.withPayload(name).build());
    }
}

直接注入 Channels

绑定的 channels 也可以直接注入:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         output.send(MessageBuilder.withPayload(name).build());
    }
}

如果在声明 annotation 上自定义 channel 的 name,则应使用 name 而不是方法 name。鉴于以下声明:

public interface CustomSource {
    ...
    @Output("customOutput")
    MessageChannel output();
}

channel 将被注入,如下面的示例所示:

@Component
public class SendingBean {

    private MessageChannel output;

    @Autowired
    public SendingBean(@Qualifier("customOutput") MessageChannel output) {
        this.output = output;
    }

    public void sayHello(String name) {
         this.output.send(MessageBuilder.withPayload(name).build());
    }
}

26.1.4 Producing 和 Consuming Messages

您可以使用 Spring Integration annotations 或 Spring Cloud Stream 的@StreamListener annotation 编写 Spring Cloud Stream application。 @StreamListener annotation 建模在其他 Spring Messaging annotations 之后(例如@MessageMapping@JmsListener@RabbitListener,etc.),但添加 content type management 并输入强制 features。

Native Spring Integration 支持

因为 Spring Cloud Stream 基于 Spring Integration,所以 Stream 完全继承 Integration 的基础和基础结构以及 component 本身。对于 example,您可以将Source的输出 channel 附加到MessageSource

@EnableBinding(Source.class)
public class TimerSource {

  @Value("${format}")
  private String format;

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "${fixedDelay}", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>(new SimpleDateFormat(format).format(new Date()));
  }
}

或者您可以在变换器中使用处理器的 channels:

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public Object transform(String message) {
    return message.toUpperCase();
  }
}

重要的是要明白当你使用@StreamListener使用相同的 binding 时,使用 pubsub model,其中每个使用@StreamListener注释的方法都会收到它自己的消息副本,每个方法都有自己的 consumer group。但是,如果共享可绑定的 channel 作为@Aggregator@Transformer@ServiceActivator的输入,那么这些将在竞争的 model 中使用,不会为每个订阅创建单独的 consumer group。

Spring Integration Error Channel 支持

Spring Cloud Stream 支持发布 Spring Integration global 错误 channel 收到的错误消息。通过为名为error的出站目标配置 binding,可以将发送到errorChannel的错误消息发布到 broker 上的特定目标。例如,要将错误消息发布到名为“myErrors”的 broker 目标,请提供以下 property:spring.cloud.stream.bindings.error.destination=myErrors

消息 Channel Binder 和错误 Channels

从 version 1.3 开始,一些基于MessageChannel的 binders 将错误发布到每个目标的离散错误 channel。此外,这些错误 channels 桥接到上面提到的 global Spring Integration errorChannel。因此,您可以使用标准 Spring Integration 流程(IntegrationFlow@ServiceActivator等)消耗所有目标的特定目标 and/or 的错误。

在 consumer 端,listener 线程捕获任何 exceptions 并将ErrorMessage转发到目标的错误 channel。消息的有效负载是MessagingException,具有正常的failedMessagecause properties。通常,从 broker 接收的原始数据包含在标题中。对于支持(并配置)死信目的地的 binders; a MessagePublishingErrorHandler订阅了 channel,原始数据被转发到死信目的地。

在 producer 方面;对于在发布消息(e.g. RabbitMQ,Kafka)后支持某种异步结果的 binders,可以通过将…producer.errorChannelEnabled设置为true来启用错误 channel。 ErrorMessage的有效负载取决于 binder implementation,但是它将是MessagingException,具有正常的failedMessage property,以及关于失败的其他 properties。有关完整的详细信息,请参阅 binder 文档。

使用 @StreamListener 进行自动 Content Type 处理

作为其 Spring Integration 支持的补充,Spring Cloud Stream 提供了自己的@StreamListener annotation,模仿其他 Spring Messaging annotations(e.g. @MessageMapping@JmsListener@RabbitListener,etc.)。@StreamListener annotation 提供了一个更简单的 model 来处理入站消息,尤其是在处理使用时涉及 content type management 和类型强制的案例。

Spring Cloud Stream 提供了一种可扩展的MessageConverter机制,用于处理绑定 channels 的数据转换,并且在这种情况下,调度到使用@StreamListener注释的方法。以下是处理外部Vote events 的 application 的示例:

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

在考虑具有String有效负载和application/json application/json头的入站Message时,可以看到@StreamListener和 Spring Integration @ServiceActivator之间的区别。在@StreamListener的情况下,MessageConverter机制将使用contentType头将String有效负载解析为Vote object。

与其他 Spring Messaging 方法一样,方法 arguments 可以使用@Payload@Headers@Header进行批注。

对于 return 数据的方法,必须使用@SendTo annotation 指定方法返回的数据的输出 binding 目标:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

使用 @StreamListener 将消息分派给多个方法

由于 version 1.2,Spring Cloud Stream 支持根据条件将消息分派到输入 channel 上注册的多个@StreamListener方法。

在 order 有资格支持条件分派,方法必须满足以下条件:

  • 它不能 return value

  • 它必须是单独的消息处理方法(不支持 reactive API 方法)

条件通过 annotation 的condition属性中的 SpEL 表达式指定,并针对每条消息进行评估。 match 条件的所有处理程序将在同一个线程中调用,并且不必对发生调用的 order 进行任何假设。

使用带有调度条件的@StreamListener的示例可以在下面看到。在此 example 中,带有带 value foo的头type的所有消息将被分派到receiveFoo方法,带有带 value bar的头type的所有消息将被分派到receiveBar方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='foo'")
    public void receiveFoo(@Payload FooPojo fooPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bar'")
    public void receiveBar(@Payload BarPojo barPojo) {
       // handle the message
    }
}

通过@StreamListener条件调度仅支持单个消息的处理程序,而不支持 reactive 编程支持(如下所述)。

26.1.5 Reactive Programming Support

Spring Cloud Stream 还支持使用 reactive API,其中传入和传出数据作为连续数据流进行处理。可以通过spring-cloud-stream-reactive获得对 reactive API 的支持,需要将其明确添加到您的项目中。

具有 reactive API 的编程 model 是声明性的,您可以使用 operators 来描述从入站数据流到出站数据流的功能转换,而不是指定应如何处理每条消息。

Spring Cloud Stream 支持以下 reactive API:

  • 反应堆

  • RxJava 1.x

将来,它旨在支持基于 Reactive Streams 的更通用的 model。

reactive 编程 model 也使用@StreamListener annotation 来设置 reactive 处理程序。不同之处在于:

  • @StreamListener annotation 不能指定输入或输出,因为它们是作为方法的 arguments 和 return 值提供的;

  • 必须使用@Input@Output注释方法的 arguments,指示传入和分别传入的数据流将连接到哪个输入或输出;

  • 方法的 return value(如果有的话)将用@Output注释,表示数据应被发送的输入。

Reactive 编程支持需要 Java 1.8.

从 Spring Cloud Stream 1.1.1 及更高版本开始(从发布列号 Brooklyn.SR2 开始),reactive 编程支持需要使用 Reactor 3.0.4.RELEASE 和更高版本。不支持早期的 Reactor 版本(包括 3.0.1.RELEASE,3.0.2.RELEASE 和 3.0.3.RELEASE)。 spring-cloud-stream-reactive将传递性地检索正确的 version,但是项目结构可以将io.projectreactor:reactor-core的 version 管理到早期版本,尤其是在使用 Maven 时。通过 Spring Initializr 使用 Spring Boot 1.x 生成的项目就是这种情况,它会将 Reactor version 重写为2.0.8.RELEASE。在这种情况下,您必须确保释放 artifact 的正确 version。这可以通过将与或更高版本的 version 直接依赖添加到项目中来实现。

术语reactive的使用目前指的是正在使用的 reactive API 而不是执行 model 是 reactive(i.e.绑定的 endpoints 仍然使用'push'而不是'pull'model)。虽然使用 Reactor 提供了一些背压支持,但我们打算在 long run 上通过对连接的中间件使用 native reactive clients 来支持完全 reactive 管道。

Reactor-based 处理程序

基于 Reactor 的处理程序可以具有以下参数类型:

  • 对于使用@Input注释的 arguments,它支持 Reactor 类型Flux。入站 Flux 的参数化遵循与单个消息处理相同的规则:它可以是整个Message,可以是Message有效载荷的 POJO,或者是基于Message content-type 的转换结果的 POJO。头。提供多个输入;

  • 对于使用Output注释的 arguments,它支持类型FluxSender,它将方法生成的Flux与输出连接起来。一般来说,只有当方法可以有多个输出时,才建议将输出指定为 arguments;

基于 Reactor 的处理程序支持_ret 类型,其中必须使用@Output进行注释。当单个输出通量可用时,我们建议使用方法的 return value。

这是一个简单的 Reactor-based 处理器的示例。

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  @Output(Processor.OUTPUT)
  public Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
    return input.map(s -> s.toUpperCase());
  }
}

使用 output arguments 的同一处理器如下所示:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  public void receive(@Input(Processor.INPUT) Flux<String> input,
     @Output(Processor.OUTPUT) FluxSender output) {
     output.send(input.map(s -> s.toUpperCase()));
  }
}

RxJava 1.x 支持

RxJava 1.x 处理程序遵循与 Reactor-based 一样的规则,但将使用ObservableObservableSender arguments 和 return 类型。

所以上面的第一个例子将成为:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  @Output(Processor.OUTPUT)
  public Observable<String> receive(@Input(Processor.INPUT) Observable<String> input) {
    return input.map(s -> s.toUpperCase());
  }
}

上面的第二个例子将成为:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  public void receive(@Input(Processor.INPUT) Observable<String> input,
     @Output(Processor.OUTPUT) ObservableSender output) {
     output.send(input.map(s -> s.toUpperCase()));
  }
}

Reactive Sources

Spring Cloud Stream reactive 支持还提供了通过 StreamEmitter annotation 创建 reactive 源的能力。使用 StreamEmitter annotation,可以将常规源转换为 reactive 源。 StreamEmitter 是一个方法 level annotation,它将方法标记为通过 EnableBinding 声明的输出的发射器。不允许将 Input annotation 与 StreamEmitter 一起使用,因为使用此 annotation 标记的方法不会从任何输入进行侦听,而是生成到输出。遵循 StreamListener 中使用的相同编程 model,StreamEmitter 还允许使用 Output annotation 的灵活方式,具体取决于方法是否具有 arguments,return 类型等。

以下是使用各种样式的 StreamEmitter 的一些示例。

以下 example 将每毫秒发出“Hello World”消息并发布到 Flux。在这种情况下,Flux 中生成的消息将被发送到 Source 的输出 channel。

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public Flux<String> emit() {
    return Flux.intervalMillis(1)
            .map(l -> "Hello World");
  }
}

以下是与上述相同的另一种样品。此方法使用 FluxSender 以编程方式从源发送 Flux,而不是返回 Flux。

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public void emit(FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

以下内容与功能和样式上的上述代码段完全相同。但是,它不是在方法 level 上使用显式的 Output annotation,而是用作方法参数 level。

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  public void emit(@Output(Source.OUTPUT) FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

下面是使用 Reactive Streams Publisher API 编写反应源的另一种方法,以及Spring Integration Java DSL中对它的支持。发布者仍在使用 Reactor Flux,但从 application 的角度来看,这对用户是透明的,只需要 Reactive Streams 和 Java DSL 来实现 Spring Integration。

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  @Bean
  public Publisher<Message<String>> emit() {
    return IntegrationFlows.from(() ->
                new GenericMessage<>("Hello World"),
        e -> e.poller(p -> p.fixedDelay(1)))
        .toReactivePublisher();
  }
}

26.1.6 聚合

Spring Cloud Stream 支持将多个应用程序聚合在一起,直接连接其输入和输出 channels,并避免通过 broker 交换消息的额外成本。从 Spring Cloud Stream 的 version 1.0 开始,仅支持以下类型的 applications 聚合:

  • sources - 具有单个输出的 applications channel 名为output,通常具有org.springframework.cloud.stream.messaging.Source类型的单个 binding

  • sinks - 带有名为input的单个输入 channel 的 applications,通常只有一个org.springframework.cloud.stream.messaging.Sink类型的 binding

  • 处理器 - 具有名为input的单个输入 channel 和名为output的单个输出 channel 的 applications,通常具有org.springframework.cloud.stream.messaging.Processor类型的单个 binding。

它们可以通过创建一系列互连的 applications 来聚合在一起,其中序列中元素的输出 channel 连接到下一个元素的输入 channel(如果存在)。序列可以从源或处理器开始,它可以包含任意数量的处理器,并且必须以处理器或 sink 结束。

根据起始和结束元素的性质,序列可以具有一个或多个可绑定的 channel,如下所示:

  • 如果序列以 source 开头,ends 以 sink 开头,则 applications 之间的所有通信都是直接的,不会绑定 channels

  • 如果序列以处理器开始,那么它的输入 channel 将成为聚合的input channel 并将相应地绑定

  • 如果序列_end 与处理器,则其输出 channel 将成为聚合的output channel 并将相应地绑定

使用AggregateApplicationBuilder实用程序 class 执行聚合,如下面的 example 中所示。让我们考虑一个项目,其中我们有源,处理器和 sink,它们可以在项目中定义,或者可以包含在项目的一个依赖项中。

如果 configuration classes 使用@SpringBootApplication,则必须在单独的包中提供聚合 application 中的每个 component(source,sink 或 processor)。由于@SpringBootApplication在同一个包内的 configuration classes 上执行 classpath 扫描,因此需要避免_app之间的 cross-talk。在下面的 example 中,可以看到 Source,Processor 和 Sink application classes 分组在不同的包中。一种可能的替代方法是在单独的@Configuration class 中提供 source,sink 或 processor configuration,避免使用@SpringBootApplication/@ComponentScan并将其用于聚合。

package com.app.mysink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

	private static Logger logger = LoggerFactory.getLogger(SinkApplication.class);

	@ServiceActivator(inputChannel=Sink.INPUT)
	public void loggerSink(Object payload) {
		logger.info("Received: " + payload);
	}
}
package com.app.myprocessor;

// Imports omitted

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {

	@Transformer
	public String loggerSink(String payload) {
		return payload.toUpperCase();
	}
}
package com.app.mysource;

// Imports omitted

@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {

	@InboundChannelAdapter(value = Source.OUTPUT)
	public String timerMessageSource() {
		return new SimpleDateFormat().format(new Date());
	}
}

每个 configuration 都可用于 running 一个单独的 component,但在这种情况下,它们可以按如下方式聚合在一起:

package com.app;

// Imports omitted

@SpringBootApplication
public class SampleAggregateApplication {

	public static void main(String[] args) {
		new AggregateApplicationBuilder()
			.from(SourceApplication.class).args("--fixedDelay=5000")
			.via(ProcessorApplication.class)
			.to(SinkApplication.class).args("--debug=true").run(args);
	}
}

序列的起始 component 作为from()方法的参数提供。序列的结束 component 作为to()方法的参数提供。提供中间处理器作为via()方法的参数。可以将相同类型的多个处理器链接在一起(e.g. 用于具有不同配置的流水线转换)。对于每个 component,构建器可以为 Spring Boot configuration 提供运行时 arguments。

配置聚合应用程序

Spring Cloud Stream 支持使用“namespace”作为前缀为聚合 application 中的各个 applications 传递 properties。

可以为 applications 设置命名空间,如下所示:

@SpringBootApplication
public class SampleAggregateApplication {

	public static void main(String[] args) {
		new AggregateApplicationBuilder()
			.from(SourceApplication.class).namespace("source").args("--fixedDelay=5000")
			.via(ProcessorApplication.class).namespace("processor1")
			.to(SinkApplication.class).namespace("sink").args("--debug=true").run(args);
	}
}

为单个 applications 设置'namespace'后,可以使用任何支持的 property 源(命令行,环境 properties 等)将带有namespace as 前缀的 application properties 传递给 aggregate application。

例如,要覆盖'source'和'sink'applications 的默认fixedDelaydebug properties:

java -jar target/MyAggregateApplication-0.0.1-SNAPSHOT.jar --source.fixedDelay=10000 --sink.debug=false

为非自包含聚合 application 配置 binding service properties

非 self-contained aggregate application 通过聚合 application 的 inbound/outbound 组件(通常是消息 channels)中的一个或两个绑定到外部 broker,而聚合 application 中的 applications 直接绑定。对于 example:源 application 的输出和处理器 application 的输入直接绑定,而处理器的输出 channel 绑定到 broker 的外部目标。传递包含 aggregate application 的 non-self 的 binding 服务 properties 时,需要将 binding 服务 properties 传递给聚合 application,而不是将它们设置为'args'到个别 child application。例如,

@SpringBootApplication
public class SampleAggregateApplication {

	public static void main(String[] args) {
		new AggregateApplicationBuilder()
			.from(SourceApplication.class).namespace("source").args("--fixedDelay=5000")
			.via(ProcessorApplication.class).namespace("processor1").args("--debug=true").run(args);
	}
}

--spring.cloud.stream.bindings.output.destination=processor-output这样的 binding properties 需要指定为外部 configuration properties 之一(cmdline arg 等)。