24. 介绍 Spring Cloud Stream
Spring Cloud Stream 是 building message-driven microservice applications 的 framework。 Spring Cloud Stream 构建在 Spring Boot 上,以创建独立的 production-grade Spring applications,并使用 Spring Integration 提供与消息代理的连接。它提供了来自几个供应商的中间件的自觉配置,介绍了持久性 publish-subscribe 语义,consumer 组和分区的概念。
您可以将@EnableBinding
annotation 添加到 application 以立即连接到消息 broker,并且可以将@StreamListener
添加到方法以使其接收 events 以进行流处理。以下是一个简单的 sink application,它接收外部消息。
@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {
public static void main(String[] args) {
SpringApplication.run(VoteRecordingSinkApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void processVote(Vote vote) {
votingService.recordVote(vote);
}
}
@EnableBinding
annotation 将一个或多个接口作为参数(在本例中,参数是单个Sink
接口)。接口声明输入 and/or 输出 channels。 Spring Cloud Stream 提供接口Source
,Sink
和Processor
;您也可以定义自己的界面。
以下是Sink
接口的定义:
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
@Input
annotation 标识一个输入 channel,通过它接收的消息进入 application; @Output
annotation 标识输出 channel,已发布的消息通过该输出离开 application。 @Input
和@Output
注释可以将 channel name 作为参数;如果未提供 name,则将使用带注释的方法的 name。
Spring Cloud Stream 将为您创建一个 implementation 接口。您可以通过自动装配在 application 中使用它,如下面的测试用例示例所示。
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {
@Autowired
private Sink sink;
@Test
public void contextLoads() {
assertNotNull(this.sink.input());
}
}