25. 介绍 Spring Cloud Stream
Spring Cloud Stream 是 building message-driven microservice applications 的 framework。 Spring Cloud Stream 构建在 Spring Boot 上以创建独立的 production-grade Spring applications 并使用 Spring Integration 来提供与消息代理的连接。它提供了来自几个供应商的中间件的自觉配置,介绍了持久性 publish-subscribe 语义,consumer 组和分区的概念。
您可以将@EnableBinding
annotation 添加到 application 以立即连接到消息 broker,并且可以将@StreamListener
添加到方法以使其接收 events 以进行流处理。以下 example 显示了一个接收外部消息的 sink application:
@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(VoteRecordingSinkApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void processVote(Vote vote) {
votingService.recordVote(vote);
}
}
@EnableBinding
annotation 将一个或多个接口作为参数(在本例中,参数是单个Sink
接口)。接口声明输入和输出 channels。 Spring Cloud Stream 提供Source
,Sink
和Processor
接口。您还可以定义自己的界面。
以下清单显示了Sink
接口的定义:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
@Input
annotation 标识输入 channel,接收的消息通过该输入进入 application。 @Output
annotation 标识输出 channel,已发布的消息通过该输出离开 application。 @Input
和@Output
注释可以将 channel name 作为参数。如果未提供 name,则使用带注释方法的 name。
Spring Cloud Stream 为您创建接口的 implementation。您可以通过自动装配在 application 中使用它,如下面的 example(来自测试用例)所示:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {
@Autowired
private Sink sink;
@Test
public void contextLoads() {
assertNotNull(this.sink.input());
}
}