24. 介绍 Spring Cloud Stream

Spring Cloud Stream 是 building message-driven microservice applications 的 framework。 Spring Cloud Stream 构建在 Spring Boot 上,以创建独立的 production-grade Spring applications,并使用 Spring Integration 提供与消息代理的连接。它提供了来自几个供应商的中间件的自觉配置,介绍了持久性 publish-subscribe 语义,consumer 组和分区的概念。

您可以将@EnableBinding annotation 添加到 application 以立即连接到消息 broker,并且可以将@StreamListener添加到方法以使其接收 events 以进行流处理。以下是一个简单的 sink application,它接收外部消息。

@SpringBootApplication
@EnableBinding(Sink.class)
public class VoteRecordingSinkApplication {

  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }
}

@EnableBinding annotation 将一个或多个接口作为参数(在本例中,参数是单个Sink接口)。接口声明输入 and/or 输出 channels。 Spring Cloud Stream 提供接口SourceSinkProcessor;您也可以定义自己的界面。

以下是Sink接口的定义:

public interface Sink {
  String INPUT = "input";

  @Input(Sink.INPUT)
  SubscribableChannel input();
}

@Input annotation 标识一个输入 channel,通过它接收的消息进入 application; @Output annotation 标识输出 channel,已发布的消息通过该输出离开 application。 @Input@Output 注释可以将 channel name 作为参数;如果未提供 name,则将使用带注释的方法的 name。

Spring Cloud Stream 将为您创建一个 implementation 接口。您可以通过自动装配在 application 中使用它,如下面的测试用例示例所示。

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)
@WebAppConfiguration
@DirtiesContext
public class StreamApplicationTests {

  @Autowired
  private Sink sink;

  @Test
  public void contextLoads() {
    assertNotNull(this.sink.input());
  }
}