29. Programming Model

要了解编程模型,您应该熟悉以下核心概念:

  • 目标绑定器: 负责与外部消息传递系统集成的组件。

  • 目标绑定: 外部消息传递系统和应用程序之间的 bridge 梁(消息的生产者和 Consumer)(由目标绑定程序创建)。

  • 消息: 生产者和 Consumer 使用的规范数据结构,用于与目标绑定程序(以及通过外部消息传递系统进行的其他应用程序)进行通信。

SCSt overview

29.1 目标绑定

Destination Binders 是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现以促进与外部消息传递系统的集成。这种集成负责连接性,委派以及与生产者和 Consumer 之间的消息的路由以及路由,数据类型转换,用户代码的调用等。

粘合剂要承担很多样板工作,否则这些工作就落在了您的肩膀上。但是,要实现这一点,Binder 仍然需要用户提供的一些简单但必需的指令集形式的帮助,这些指令集通常以某种类型的配置形式出现。

尽管讨论所有可用的 Binder 和绑定配置选项(本手册的其余部分都涉及它们)超出了本节的范围,但是目标绑定确实需要特别注意。下一节将详细讨论。

29.2 目标绑定

如前所述,“目标绑定”提供了外部消息传递系统与应用程序提供的“生产者”和“Consumer”之间的 bridge 梁。

将@EnableBinding 注解应用于应用程序的配置类之一可定义目标绑定。 @EnableBindingComments 本身使用@Configuration进行元 Comments,并触发 Spring Cloud Stream 基础结构的配置。

以下示例显示了一个功能完整且运行良好的 Spring Cloud Stream 应用程序,该应用程序从INPUT目标接收消息的有效负载作为String类型(请参见第 32 章,Content Type 协商部分),将其记录到控制台,并将其转换为OUTPUT后发送到OUTPUT目标大写。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {

	public static void main(String[] args) {
		SpringApplication.run(MyApplication.class, args);
	}

	@StreamListener(Processor.INPUT)
	@SendTo(Processor.OUTPUT)
	public String handle(String value) {
		System.out.println("Received: " + value);
		return value.toUpperCase();
	}
}

如您所见,@EnableBinding注解可以将一个或多个接口类作为参数。这些参数称为* bindings ,它们包含表示 bindablecomponent *的方法。这些组件通常是基于通道的 Binder(例如 Rabbit,Kafka 等)的消息通道(请参阅Spring Messaging)。但是,其他类型的绑定可以为相应技术的本机功能提供支持。例如,Kafka StreamsBinder(以前称为 KStream)允许直接将本地绑定绑定到 Kafka Streams(有关更多详细信息,请参见Kafka Streams)。

Spring Cloud Stream 已经为典型的消息交换 Contract 提供了* binding *接口,其中包括:

  • 接收者: 通过提供消费消息的目的地来标识消息 Consumer 的 Contract。

  • 来源: 通过提供生成的消息发送到的目的地来标识消息生成者的 Contract。

  • 处理器: 通过暴露两个允许使用和产生消息的目的地,封装了接收器和源协定。

public interface Sink {

  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}
public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();
}
public interface Processor extends Source, Sink {}

尽管前面的示例满足了大多数情况,但是您还可以通过定义自己的绑定接口并使用@Input@Output注解来标识实际的可绑定组件来定义自己的 Contract。

For example:

public interface Barista {

    @Input
    SubscribableChannel orders();

    @Output
    MessageChannel hotDrinks();

    @Output
    MessageChannel coldDrinks();
}

将上一个示例中显示的接口用作@EnableBinding的参数,将分别触发三个绑定通道ordershotDrinkscoldDrinks的创建。

您可以根据需要提供尽可能多的绑定接口,作为@EnableBinding注解的参数,如以下示例所示:

@EnableBinding(value = { Orders.class, Payment.class })

在 Spring Cloud Stream 中,可绑定的MessageChannel组件是 Spring Messaging MessageChannel(用于出站)及其 extensionsSubscribableChannel(用于入站)。

可轮询的目标绑定

尽管前面描述的绑定支持基于事件的消息使用,但是有时您需要更多控制,例如使用率。

从 2.0 版开始,您现在可以绑定可轮询的使用者:

以下示例显示了如何绑定可轮询的使用者:

public interface PolledBarista {

    @Input
    PollableMessageSource orders();
	. . .
}

在这种情况下,PollableMessageSource的实现绑定到orders“通道”。有关更多详细信息,请参见第 29.3.5 节“使用受调查的 Consumer”

自定义 Channels 名称

通过使用@Input@Output注解,可以为 Channels 指定自定义 Channels 名称,如以下示例所示:

public interface Barista {
    @Input("inboundOrders")
    SubscribableChannel orders();
}

在前面的示例中,创建的绑定通道名为inboundOrders

通常,您无需直接访问单个通道或绑定(除非通过@EnableBindingComments 对其进行配置)。但是,您有时可能会遇到诸如测试或其他极端情况的情况。

除了为每个绑定生成通道并将其注册为 Spring Bean 之外,Spring Cloud Stream 还为每个绑定接口生成一个实现该接口的 bean。这意味着您可以通过在应用程序中自动接线来访问表示绑定或各个通道的接口,如以下两个示例所示:

自动线绑定界面

@Autowire
private Source source

public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}

自动连线个别 Channels

@Autowire
private MessageChannel output;

public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}

对于自定义通道名称或在需要特别命名通道的多通道方案中,您也可以使用标准 Spring 的@QualifierComments。

下面的示例演示如何以这种方式使用@Qualifier 注解:

@Autowire
@Qualifier("myChannel")
private MessageChannel output;

29.3 产生和使用消息

您可以使用 Spring IntegrationComments 或 Spring Cloud Stream 本机 Comments 编写 Spring Cloud Stream 应用程序。

29.3.1 Spring 集成支持

Spring Cloud Stream 构建在企业整合模式定义的概念和模式的基础上,并依靠其内部实现依赖于 Spring 项目组合Spring Integration框架中已经构建并流行的企业集成模式实现。

因此,它自然支持 Spring Integration 已经构建的基础,语义和配置选项。

例如,您可以将Source的输出通道附加到MessageSource并使用熟悉的@InboundChannelAdapterComments,如下所示:

@EnableBinding(Source.class)
public class TimerSource {

  @Bean
  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
  public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
  }
}

同样,可以在为* Processor *绑定协定提供消息处理程序方法的实现时,使用@Transformer 或@ServiceActivator,如以下示例所示:

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
  public Object transform(String message) {
    return message.toUpperCase();
  }
}

Note

尽管这可能会略过一些,但重要的是要了解,当您使用@StreamListenerComments 从同一绑定进行消费时,将使用 pub-sub 模型。每个带有@StreamListenerComments 的方法都会收到自己的消息副本,并且每个都有自己的使用者组。但是,如果您通过使用 Spring Integration 注解(例如@Aggregator@Transformer@ServiceActivator)之一从同一绑定中消费,则这些消费在竞争模型中。没有为每个订阅创建单独的 Consumer 组。

29.3.2 使用@StreamListener 注解

作为对其 Spring Integration 支持的补充,Spring Cloud Stream 提供了自己的@StreamListenerComments(以其他 Spring MessagingComments(@MessageMapping@JmsListener@RabbitListener等)为模型,并提供了便利,例如基于内容的路由等。

@EnableBinding(Sink.class)
public class VoteHandler {

  @Autowired
  VotingService votingService;

  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}

与其他 Spring Messaging 方法一样,方法参数可以用@Payload@Headers@HeaderComments。

对于返回数据的方法,必须使用@SendTo注解为该方法返回的数据指定输出绑定目标,如以下示例所示:

@EnableBinding(Processor.class)
public class TransformProcessor {

  @Autowired
  VotingService votingService;

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}

29.3.3 使用@StreamListener 进行基于内容的路由

Spring Cloud Stream 支持根据条件将消息调度到以@StreamListenerComments 的多个处理程序方法。

为了有资格支持条件分派,一种方法必须满足以下条件:

  • 它不能返回值。

  • 它必须是单独的消息处理方法(不支持 ReactiveAPI 方法)。

该条件由 Comments 的condition参数中的 SpEL 表达式指定,并针对每条消息进行评估。所有与条件匹配的处理程序都在同一线程中调用,并且不必假设调用的 Sequences。

在具有分配条件的@StreamListener的以下示例中,所有带有 Headerstype且值为bogey的消息均被分配到receiveBogey方法,所有带有 Headerstype且具有bacall值的消息均被分配到receiveBacall方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}

condition上下文中的 Content Type 协商

使用@StreamListenercondition参数了解基于内容的路由背后的一些机制很重要,尤其是在整个消息类型的上下文中。如果您在 continue 之前熟悉第 32 章,Content Type 协商,也可能会有所帮助。

请考虑以下情形:

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class CatsAndDogs {

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Dog'")
    public void bark(Dog dog) {
       // handle the message
    }

    @StreamListener(target = Sink.INPUT, condition = "payload.class.simpleName=='Cat'")
    public void purr(Cat cat) {
       // handle the message
    }
}

前面的代码是完全有效的。它可以毫无问题地进行编译和部署,但是永远不会产生您期望的结果。

这是因为您正在测试的东西在您期望的状态下尚不存在。这是因为消息的有效负载尚未从有线格式(byte[])转换为所需的类型。换句话说,它尚未经过第 32 章,Content Type 协商中描述的类型转换过程。

因此,除非您使用 SPeL 表达式来评估原始数据(例如,字节数组中第一个字节的值),否则请使用基于消息 Headers 的表达式(例如condition = "headers['type']=='dog'")。

Note

目前,仅基于通道的绑定程序(不支持反应式编程)支持通过@StreamListener条件进行分派。

29.3.4 Spring Cloud Function 支持

从 Spring Cloud Stream v2.1 开始,定义* stream handlers sources *的另一种选择是对Spring 云功能使用内置支持,其中它们可以表示为java.util.function.[Supplier/Function/Consumer]类型的 bean。

要指定哪个功能 bean 绑定到绑定公开的外部目标,必须提供spring.cloud.stream.function.definition属性。

这是 Processor 应用程序将消息处理程序公开为java.util.function.Function的示例

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class, "--spring.cloud.stream.function.definition=toUpperCase");
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在上面的代码中,我们仅定义了一个类型为java.util.function.Function的 bean,称为* toUpperCase *,并将其标识为用作消息处理程序的 bean,其“Importing”和“输出”必须绑定到 Processor 绑定公开的外部目标。

以下是支持源,处理器和接收器的简单功能应用程序的示例。

这是定义为java.util.function.Supplier的源应用程序的示例

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
	public static void main(String[] args) {
		SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
	}
	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

这是定义为java.util.function.Function的 Processor 应用程序的示例

@SpringBootApplication
@EnableBinding(Processor.class)
public static class ProcessorFromFunction {
	public static void main(String[] args) {
		SpringApplication.run(ProcessorFromFunction.class, "--spring.cloud.stream.function.definition=toUpperCase");
	}
	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

这是一个定义为java.util.function.Consumer的接收器应用程序的示例

@EnableAutoConfiguration
@EnableBinding(Sink.class)
public static class SinkFromConsumer {
	public static void main(String[] args) {
		SpringApplication.run(SinkFromConsumer.class, "--spring.cloud.stream.function.definition=sink");
	}
	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

Functional Composition

使用此编程模型,您还可以从功能组合中受益,在该功能组合中,您可以从一组简单的函数中动态组成复杂的处理程序。作为示例,让我们将以下功能 bean 添加到上面定义的应用程序中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改spring.cloud.stream.function.definition属性,以反映您打算从'toUpperCase'和'wrapInQuotes'编写新函数的意图。为此,Spring Cloud Function 允许您使用|(竖线)符号。因此,完成我们的示例,我们的属性现在将如下所示:

—spring.cloud.stream.function.definition=toUpperCase|wrapInQuotes

29.3.5 使用轮询的使用者

Overview

当使用轮询的使用者时,您可以按需轮询PollableMessageSource。考虑以下受调查 Consumer 的示例:

public interface PolledConsumer {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

给定上一示例中的受调查 Consumer,您可以按以下方式使用它:

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

PollableMessageSource.poll()方法采用MessageHandler参数(通常为 lambda 表达式,如此处所示)。如果收到并成功处理了消息,它将返回true

与消息驱动的使用者一样,如果MessageHandler引发异常,则消息将发布到错误通道,如“ ???”中所述。

通常,MessageHandler退出时,poll()方法会确认该消息。如果该方法异常退出,则消息被拒绝(不重新排队),但请参见称为“处理错误”的部分。您可以通过对确认负责来覆盖该行为,如以下示例所示:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}

Tip

您必须在某个时候ack(或nack)消息,以避免资源泄漏。

Tip

一些消息传递系统(例如 Apache Kafka)在日志中维护简单的偏移量。如果传递失败并使用StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);重新排队,则重新传递任何以后成功确认的消息。

还有一个重载的poll方法,其定义如下:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type是一个转换提示,它允许转换传入的消息有效负载,如以下示例所示:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

Handling Errors

默认情况下,为可轮询源配置了一个错误通道。如果回调引发异常,则将ErrorMessage发送到错误通道(<destination>.<group>.errors);该错误通道还 bridge 到全局 Spring Integration errorChannel

您可以使用@ServiceActivator订阅任何一个错误通道来处理错误;如果没有订阅,则将仅记录错误并确认消息成功。如果错误通道服务激活器抛出异常,则该消息将被拒绝(默认情况下),并且不会重新发送。如果服务激活器抛出RequeueCurrentMessageException,则消息将在代理处重新排队,并在随后的轮询中再次检索。

如果侦听器直接抛出RequeueCurrentMessageException,则消息将如上所述重新排队,并且不会发送到错误通道。

29.4 错误处理

错误会发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。错误处理有两种形式:

  • 应用程序: 错误处理是在应用程序(自定义错误处理程序)中完成的。

  • 系统: 错误处理委托给联编程序(重新排队,DL 和其他)。注意,这些技术取决于绑定程序的实现和底层消息传递中间件的功能。

Spring Cloud Stream 使用Spring Retry库来促进成功的消息处理。有关更多详细信息,请参见第 29.4.3 节“重试模板”。但是,当所有方法均失败时,消息处理程序引发的异常将传播回绑定程序。那时,Binder 调用自定义错误处理程序或将错误传达回消息传递系统(重新排队,DLQ 等)。

29.4.1 应用程序错误处理

有两种类型的应用程序级错误处理。可以在每个绑定订阅中处理错误,或者全局处理程序可以处理所有绑定订阅错误。让我们查看详细信息。

图 29.1. 具有自定义和全局错误处理程序的 Spring Cloud Stream Sink 应用程序

自定义与全局错误通道

对于每个 Importing 绑定,Spring Cloud Stream 创建具有以下语义<destinationName>.errors的专用错误通道。

Note

<destinationName>由绑定的名称(例如input)和组的名称(例如myGroup)组成。

考虑以下:

spring.cloud.stream.bindings.input.group=myGroup
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(Person value) {
	throw new RuntimeException("BOOM!");
}

@ServiceActivator(inputChannel = Processor.INPUT + ".myGroup.errors") //channel name 'input.myGroup.errors'
public void error(Message<?> message) {
	System.out.println("Handling ERROR: " + message);
}

在前面的示例中,目标名称为input.myGroup,专用错误通道名称为input.myGroup.errors

Note

@StreamListener 注解的使用专门用于定义 bridge 内部通道和外部目标的绑定。鉴于目标特定错误通道没有关联的外部目标,因此该通道是 Spring Integration(SI)的特权。这意味着必须使用 SI 处理程序 Comments 之一(即@ ServiceActivator,@ Transformer 等)定义用于此类目标的处理程序。

Note

如果未指定group,则使用匿名组(类似于input.anonymous.2K37rb06Q6m2r51-SPIDDQ),这不适用于错误处理方案,因为在创建目标之前,您不知道它将是什么。

另外,如果您绑定到现有目的地,例如:

spring.cloud.stream.bindings.input.destination=myFooDestination
spring.cloud.stream.bindings.input.group=myGroup

完整的目标名称为myFooDestination.myGroup,然后专用错误通道名称为myFooDestination.myGroup.errors

回到示例...

订阅名为input的 Channels 的handle(..)方法引发异常。给定错误信道input.myGroup.errors的一个订阅者,所有错误消息均由该订阅者处理。

如果您有多个绑定,则可能需要一个错误处理程序。 Spring Cloud Stream 通过将每个单独的错误通道 bridge 到名为errorChannel的通道来自动提供对* global error channel *的支持,从而允许单个订阅者处理所有错误,如以下示例所示:

@StreamListener("errorChannel")
public void error(Message<?> message) {
	System.out.println("Handling ERROR: " + message);
}

如果错误处理逻辑相同,则与哪个处理程序产生错误无关,这可能是一个方便的选择。

29.4.2 系统错误处理

系统级错误处理意味着将错误传递回消息传递系统,并且鉴于并非每个消息传递系统都相同,因此各个粘合剂的功能可能有所不同。

就是说,在本节中,我们解释了系统级错误处理背后的一般思想,并以 Rabbit 绑定程序为例。注意:尽管某些配置属性确实有所不同,但 KafkaBinder 提供了类似的支持。另外,有关更多详细信息和配置选项,请参见各个 Binder 的文档。

如果未配置内部错误处理程序,则错误将传播到绑定程序,而绑定程序随后会将这些错误传播回消息传递系统。根据消息传递系统的功能,这样的系统可能丢弃消息,重新排队消息以进行重新处理或*将失败的消息发送给 DLQ *。 Rabbit 和 Kafka 都支持这些概念。但是,其他联编程序可能没有,因此请参阅您单独的联编程序的文档,以获取有关受支持的系统级错误处理选项的详细信息。

删除失败的消息

默认情况下,如果未提供其他系统级配置,则消息传递系统将丢弃失败的消息。虽然在某些情况下可以接受,但在大多数情况下是不可接受的,我们需要一些恢复机制来避免消息丢失。

DLQ-死信队列

DLQ 允许将失败的消息发送到特殊目的地:-* Dead Letter Queue *。

配置后,失败的消息将发送到此目标,以进行后续的重新处理或审核与对帐。

例如,continue 前面的示例并使用 RabbitBinder 设置 DLQ,您需要设置以下属性:

spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true

请记住,在上面的属性中,input对应于 Importing 目标绑定的名称。 consumer表示它是使用者属性,而auto-bind-dlq指示绑定程序为input目标配置 DLQ,这将导致另外一个 Rabbit 队列input.myGroup.dlq

配置完成后,所有失败的消息都会通过错误消息路由到此队列,类似于以下内容:

delivery_mode:	1
headers:
x-death:
count:	1
reason:	rejected
queue:	input.hello
time:	1522328151
exchange:
routing-keys:	input.myGroup
Payload {"name":"Bob"}

从上面可以看到,原始消息会保留下来以供进一步操作。

但是,您可能已经注意到的一件事是,有关消息处理的原始问题的信息有限。例如,您看不到与原始错误相对应的堆栈跟踪。要获取有关原始错误的更多相关信息,您必须设置一个附加属性:

spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=true

这样做会强制内部错误处理程序拦截错误消息并向其添加其他信息,然后再将其发布到 DLQ。配置完成后,您可以看到该错误消息包含与原始错误有关的更多信息,如下所示:

delivery_mode:	2
headers:
x-original-exchange:
x-exception-message:	has an error
x-original-routingKey:	input.myGroup
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload {"name":"Bob"}

这有效地结合了应用程序级和系统级的错误处理,以进一步协助下游故障排除机制。

重新排队失败的消息

如前所述,当前支持的 Binder(Rabbit 和 Kafka)依靠RetryTemplate来促进成功的消息处理。有关详情,请参见第 29.4.3 节“重试模板”。但是,对于max-attempts属性设置为 1 的情况,将禁用消息的内部重新处理。此时,您可以通过指示消息传递系统重新排队失败的消息来促进消息的重新处理(重试)。重新排队后,失败的消息将被发送回原始处理程序,从而创建一个重试循环。

如果错误的性质与某些资源的偶发性但短期不可用有关,则此选项可能是可行的。

为此,必须设置以下属性:

spring.cloud.stream.bindings.input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

在前面的示例中,max-attempts设置为 1 实质上禁用了内部重试,而requeue-rejected(* requeue rejected messages *的缩写)设置为true。设置后,失败的消息将重新提交给相同的处理程序,并连续循环,直到处理程序抛出AmqpRejectAndDontRequeueException为止,这实际上使您可以在处理程序本身内构建自己的重试逻辑。

29.4.3 重试模板

RetryTemplateSpring Retry库的一部分。尽管涵盖了RetryTemplate的所有功能超出了本文档的范围,但我们将提及以下与RetryTemplate特别相关的使用者属性:

  • maxAttempts

    • 处理消息的尝试次数。

Default: 3.

  • backOffInitialInterval

    • 重试时的退避初始间隔。

默认值 1000 毫秒。

  • backOffMaxInterval

    • 最大退避间隔。

默认值 10000 毫秒。

  • backOffMultiplier

    • 退避乘数。

Default 2.0.

  • defaultRetryable

    • retryableExceptions中未列出的侦听器引发的异常是否可以重试。

默认值:true

  • retryableExceptions

    • 键中 Throwable 类名称的 Map,值中布尔值的 Map。指定将要重试的那些异常(和子类)。另请参见defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

Default: empty.

尽管上述设置足以满足大多数自定义要求,但它们可能无法满足某些复杂的要求,此时您可能需要提供自己的RetryTemplate实例。为此,将其配置为应用程序配置中的 Bean。应用程序提供的实例将覆盖框架提供的实例。另外,为避免冲突,您必须将绑定器要使用的RetryTemplate的实例限定为@StreamRetryTemplate。例如,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

从上面的示例中可以看到,您不需要用@Bean对其进行 Comments,因为@StreamRetryTemplate是合格的@Bean

29.5 反应式编程支持

Spring Cloud Stream 还支持使用反应式 API,将传入和传出的数据作为连续的数据流进行处理。可通过spring-cloud-stream-reactive获得对反应式 API 的支持,需要将其显式添加到您的项目中。

具有响应式 API 的编程模型是声明性的。您可以使用描述从入站数据流到出站数据流的功能转换的运算符,而不是指定每个消息的处理方式。

目前,Spring Cloud Stream 仅支持Reactor API。将来,我们打算支持基于反应式流的更通用的模型。

反应式编程模型还使用@StreamListenerComments 来设置反应式处理程序。区别在于:

  • @StreamListenerComments 不能指定 Importing 或输出,因为它们作为参数提供并从方法返回值。

  • 该方法的参数必须用@Input@OutputComments,分别指示传入和传出数据流连接到哪个 Importing 或输出。

  • 方法的返回值(如果有的话)以@OutputComments,指示应将数据发送到的 Importing。

Note

响应式编程支持需要 Java 1.8.

Note

从 Spring Cloud Stream 1.1.1 及更高版本(从发布系列 Brooklyn.SR2 开始)开始,反应式编程支持要求使用 Reactor 3.0.4.RELEASE 及更高版本。不支持早期版本的 Reactor(包括 3.0.1.RELEASE,3.0.2.RELEASE 和 3.0.3.RELEASE)。 spring-cloud-stream-reactive可传递地检索正确的版本,但是项目结构可以将io.projectreactor:reactor-core的版本 Management 为较早的版本,尤其是在使用 Maven 时。对于使用 Spring Initializr 和 Spring Boot 1.x 生成的项目就是这种情况,该项目将 Reactor 版本覆盖为2.0.8.RELEASE。在这种情况下,您必须确保发布了正确版本的工件。您可以通过将对io.projectreactor:reactor-core的直接依赖项(版本为3.0.4.RELEASE或更高版本)添加到项目中来实现。

Note

当前,术语“反应式”的使用是指正在使用的反应式 API,而不是指执行模型是反应式的(也就是说,绑定的端点仍使用“推”式而非“拉式”模型)。尽管通过使用 Reactor 提供了一些反压支持,但我们确实打算在将来的版本中通过将本机 ReactiveClient 端用于连接的中间件来完全支持 Reactive 管道。

29.5.1 基于 Reactor 的处理程序

基于 Reactor 的处理程序可以具有以下参数类型:

  • 对于带有@InputComments 的参数,它支持 Reactor Flux类型。入站 Flux 的参数化遵循与处理单个消息时相同的规则:可以是整个Message,可以是Message负载的 POJO,也可以是基于Message内容进行转换的结果的 POJO。类型的 Headers。提供了多个 Importing。

  • 对于带有OutputComments 的参数,它支持FluxSender类型,该类型将方法生成的Flux与输出连接。一般而言,仅当该方法可以具有多个输出时才建议将输出指定为参数。

基于 Reactor 的处理程序支持Flux的返回类型。在这种情况下,必须使用@Output进行 Comments。当单个输出Flux可用时,建议使用该方法的返回值。

以下示例显示了基于 Reactor 的Processor

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  @Output(Processor.OUTPUT)
  public Flux<String> receive(@Input(Processor.INPUT) Flux<String> input) {
    return input.map(s -> s.toUpperCase());
  }
}

使用输出参数的同一处理器看起来像以下示例:

@EnableBinding(Processor.class)
@EnableAutoConfiguration
public static class UppercaseTransformer {

  @StreamListener
  public void receive(@Input(Processor.INPUT) Flux<String> input,
     @Output(Processor.OUTPUT) FluxSender output) {
     output.send(input.map(s -> s.toUpperCase()));
  }
}

29.5.2Reactive 来源

Spring Cloud StreamReactive 支持还提供了通过@StreamEmitterComments 创建 Reactive 源的功能。通过使用@StreamEmitter注解,可以将常规源转换为被动源。 @StreamEmitter是方法级别的 Comments,用于将方法标记为@EnableBinding声明的输出的 Launcher。不能将@InputComments 与@StreamEmitter一起使用,因为用此 Comments 标记的方法不会监听任何 Importing。而是用@StreamEmitter标记的方法生成输出。遵循@StreamListener中使用的相同编程模型,@StreamEmitter还允许使用@Output注解的灵活方式,具体取决于方法是否具有任何参数,返回类型和其他考虑因素。

本节的其余部分包含使用各种样式的@StreamEmitterComments 的示例。

以下示例每毫秒发出一次Hello, World消息,并发布到 Reactor Flux

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public Flux<String> emit() {
    return Flux.intervalMillis(1)
            .map(l -> "Hello World");
  }
}

在前面的示例中,Flux中的结果消息被发送到Source的输出通道。

下一个示例是发送 Reactor Flux@StreamEmmitter的另一种形式。以下方法代替返回Flux而不是返回Flux,以编程方式从源发送Flux

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  public void emit(FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

下一个示例在功能和样式上与上述代码段完全相同。但是,它没有在方法上使用显式的@OutputComments,而是在 method 参数上使用了 Comments。

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  public void emit(@Output(Source.OUTPUT) FluxSender output) {
    output.send(Flux.intervalMillis(1)
            .map(l -> "Hello World"));
  }
}

本节的最后一个示例是使用 Reactive Streams Publisher API 并利用Spring Integration Java DSL中对此功能的支持来编写反应源的另一种形式。以下示例中的Publisher仍在幕后使用 Reactor Flux,但是,从应用程序角度看,这对用户是透明的,并且只需要 Reactive Streams 和 Java DSL 即可进行 Spring 集成:

@EnableBinding(Source.class)
@EnableAutoConfiguration
public static class HelloWorldEmitter {

  @StreamEmitter
  @Output(Source.OUTPUT)
  @Bean
  public Publisher<Message<String>> emit() {
    return IntegrationFlows.from(() ->
                new GenericMessage<>("Hello World"),
        e -> e.poller(p -> p.fixedDelay(1)))
        .toReactivePublisher();
  }
}