36. 入门

要开始 creating Spring Cloud Stream applications,请访问Spring Initializr并创建一个名为“GreetingSource”的新 Maven 项目。 选择 Spring Boot {。 47}在下拉列表中。在“搜索依赖项”文本框中,键入Stream RabbitStream Kafka,具体取决于您要使用的 binder。

接下来,在与GreetingSourceApplication class 相同的包中创建一个新的 class,GreetingSource。给它以下 code:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;

@EnableBinding(Source.class)
public class GreetingSource {

    @InboundChannelAdapter(Source.OUTPUT)
    public String greet() {
        return "hello world " + System.currentTimeMillis();
    }
}

@EnableBinding annotation 是触发 Spring Integration 基础结构组件创建的原因。具体来说,它将创建一个 Kafka 连接工厂,一个 Kafka 出站 channel 适配器,以及在 Source 接口内定义的消息 channel:

public interface Source {

  String OUTPUT = "output";

  @Output(Source.OUTPUT)
  MessageChannel output();

}

auto-configuration 还会创建一个默认轮询器,以便每秒调用一次greet()方法。标准 Spring Integration @InboundChannelAdapter annotation 使用 return value 作为消息的有效负载向源的输出 channel 发送消息。

要 test-drive 此设置,运行 Kafka 消息 broker。一种简单的方法是使用 Docker 镜像:

# On OS X
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka

# On Linux
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka

Build application:

./mvnw clean package

consumer application 以类似的方式编码。返回 Initializr 并创建另一个名为 LoggingSink 的项目。然后在与 class LoggingSinkApplication相同的包中创建一个新的 class,LoggingSink,并使用以下 code:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
public class LoggingSink {

    @StreamListener(Sink.INPUT)
    public void log(String message) {
        System.out.println(message);
    }
}

Build application:

./mvnw clean package

要将 GreetingSource application 连接到 LoggingSink application,每个 application 必须共享相同的目标 name。启动两个 applications,如下所示,您将看到 consumer application 打印“hello world”和 console 的时间戳:

cd GreetingSource
java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest

cd LoggingSink
java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest

(不同的服务器 port 可以防止用于为两个 applications.)中的 Spring Boot Actuator endpoints 提供服务的 HTTP port 的冲突

LoggingSink application 的输出类似于以下内容:

[           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)
[           main] com.example.LoggingSinkApplication       : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)
hello world 1458595076731
hello world 1458595077732
hello world 1458595078733
hello world 1458595079734
hello world 1458595080735

36.1 在 CloudFoundry 上部署 Stream applications

在 CloudFoundry 上,服务通常通过名为VCAP_SERVICES的特殊环境变量公开。

配置 binder 连接时,可以使用环境变量中的值,如数据流 cloudfoundry 服务器 docs 中所述。