36. 入门
要开始 creating Spring Cloud Stream applications,请访问Spring Initializr并创建一个名为“GreetingSource”的新 Maven 项目。 选择 Spring Boot {。 47}在下拉列表中。在“搜索依赖项”文本框中,键入Stream Rabbit
或Stream Kafka
,具体取决于您要使用的 binder。
接下来,在与GreetingSourceApplication
class 相同的包中创建一个新的 class,GreetingSource
。给它以下 code:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.annotation.InboundChannelAdapter;
@EnableBinding(Source.class)
public class GreetingSource {
@InboundChannelAdapter(Source.OUTPUT)
public String greet() {
return "hello world " + System.currentTimeMillis();
}
}
@EnableBinding
annotation 是触发 Spring Integration 基础结构组件创建的原因。具体来说,它将创建一个 Kafka 连接工厂,一个 Kafka 出站 channel 适配器,以及在 Source 接口内定义的消息 channel:
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
auto-configuration 还会创建一个默认轮询器,以便每秒调用一次greet()
方法。标准 Spring Integration @InboundChannelAdapter
annotation 使用 return value 作为消息的有效负载向源的输出 channel 发送消息。
要 test-drive 此设置,运行 Kafka 消息 broker。一种简单的方法是使用 Docker 镜像:
# On OS X
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka
# On Linux
$ docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 spotify/kafka
Build application:
./mvnw clean package
consumer application 以类似的方式编码。返回 Initializr 并创建另一个名为 LoggingSink 的项目。然后在与 class LoggingSinkApplication
相同的包中创建一个新的 class,LoggingSink
,并使用以下 code:
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@EnableBinding(Sink.class)
public class LoggingSink {
@StreamListener(Sink.INPUT)
public void log(String message) {
System.out.println(message);
}
}
Build application:
./mvnw clean package
要将 GreetingSource application 连接到 LoggingSink application,每个 application 必须共享相同的目标 name。启动两个 applications,如下所示,您将看到 consumer application 打印“hello world”和 console 的时间戳:
cd GreetingSource
java -jar target/GreetingSource-0.0.1-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=mydest
cd LoggingSink
java -jar target/LoggingSink-0.0.1-SNAPSHOT.jar --server.port=8090 --spring.cloud.stream.bindings.input.destination=mydest
(不同的服务器 port 可以防止用于为两个 applications.)中的 Spring Boot Actuator endpoints 提供服务的 HTTP port 的冲突
LoggingSink application 的输出类似于以下内容:
[ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8090 (http)
[ main] com.example.LoggingSinkApplication : Started LoggingSinkApplication in 6.828 seconds (JVM running for 7.371)
hello world 1458595076731
hello world 1458595077732
hello world 1458595078733
hello world 1458595079734
hello world 1458595080735
36.1 在 CloudFoundry 上部署 Stream applications
在 CloudFoundry 上,服务通常通过名为VCAP_SERVICES的特殊环境变量公开。
配置 binder 连接时,可以使用环境变量中的值,如数据流 cloudfoundry 服务器 docs 中所述。