38. Apache Kafka Streams Binder

38.1 用法

要使用 Kafka Streams binder,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

38.2 Kafka Streams Binder 概述

Spring Cloud Stream 的 Apache Kafka 支持还包括为 Apache Kafka Streams binding 明确设计的 binder implementation。通过这个本机 integration,Spring Cloud Stream“处理器”application 可以直接在核心业务逻辑中使用Apache Kafka Streams API。

Kafka Streams binder implementation 构建于在 Spring Kafka 中的 Kafka Streams项目提供的基础之上。

作为本机 integration 的一部分,Kafka Streams API 提供的 high-level Streams DSL也可用于业务逻辑。

也可以获得处理器 API支持的早期 version。

如前所述 early-on,Kafka Streams 支持 Spring Cloud Stream 严格只能在 Processor model 中使用。一个 model,其中可以应用从入站 topic 读取的消息,业务处理,并且可以将转换后的消息写入出站 topic。它也可以在带有 no-outbound 目标的 Processor applications 中使用。

38.2.1 Streams DSL

此 application 使用来自 Kafka topic(e.g. ,words)的数据,计算 5 秒 time 窗口中每个唯一字的字数,并将计算结果发送到下游 topic(e.g. ,counts)以进行进一步处理。

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {

	@StreamListener("input")
	@SendTo("output")
	public KStream<?, WordCount> process(KStream<?, String> input) {
		return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}

一旦构建为 uber-jar(e.g. ,wordcount-processor.jar),您就可以运行上面的 example,如下所示。

java -jar wordcount-processor.jar  --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts

此 application 将使用来自 Kafka topic words的消息,并将计算结果发布到输出 topic counts

Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream objects。作为开发人员,您可以专注于 code,i.e 的业务方面。写入处理器所需的逻辑。设置 Kafka Streams 基础结构所需的 Streams DSL 特定配置由 framework 自动处理。

38.3 Configuration 选项

本节包含 Kafka Streams binder 使用的 configuration 选项。

有关 common configuration 选项和与 binder 相关的 properties,请参阅核心文件

38.3.1 Kafka Streams Properties

binder level 中提供了以下 properties,并且必须以spring.cloud.stream.kafka.streams.binder. literal 为前缀。

  • 组态

    • Map 包含 key/value 对,包含与 Apache Kafka Streams API 相关的 properties。此 property 必须以spring.cloud.stream.kafka.streams.binder.为前缀。以下是使用此 property 的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能进入流 configuration 的所有 properties 的更多信息,请参阅 Apache Kafka Streams docs 中的 StreamsConfig JavaDocs。

  • 经纪商

    • Broker URL

默认值:localhost

  • zkNodes

    • Zookeeper URL

默认值:localhost

  • serdeError

    • 反序列化错误处理程序类型。可能的值为 - logAndContinuelogAndFailsendToDlq

默认值:logAndFail

  • applicationID

  • 当前 application context 中所有流配置的应用程序 ID。您可以使用 binding 上的group property 覆盖单个StreamListener方法的 application id。在相同方法的多个输入的情况下,您必须确保对所有输入绑定使用相同的 group name。

默认值:default

以下 properties 仅适用于 Kafka Streams 生成器,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. literal 为前缀。

  • keySerde

    • key serde 使用

默认值:none

  • valueSerde

    • value serde 使用

默认值:none

  • useNativeEncoding

    • flag 启用本机编码

默认值:false

以下 properties 仅适用于 Kafka Streams 使用者,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer. literal 为前缀。

  • keySerde

    • key serde 使用

默认值:none

  • valueSerde

    • value serde 使用

默认值:none

  • materializedAs

    • state store 在使用传入的 KTable 类型时实现

默认值:none

  • useNativeDecoding

    • flag 启用本机解码

默认值:false

  • dlqName

    • DLQ topic name。

默认值:none

38.3.2 TimeWindow properties:

窗口化是流处理应用程序中的一个重要概念。以下 properties 可用于配置 time-window 计算。

  • spring.cloud.stream.kafka.streams.timeWindow.length

    • 当给出 property 时,您可以将TimeWindows bean 自动装入 application。 value 以毫秒表示。

默认值:none

  • spring.cloud.stream.kafka.streams.timeWindow.advanceBy

    • Value 以毫秒为单位。

默认值:none

38.4 多输入绑定

对于需要多个传入 KStream objects 或 KStream 和 KTable objects 组合的用例,Kafka Streams binder 提供多个绑定支持。

让我们看看它的实际效果。

38.4.1 多个输入绑定为 Sink

@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
                    @Input("inputTable") KTable<Long, Song> songTable) {
                    ....
                    ....
}

interface KStreamKTableBinding {

    @Input("inputStream")
    KStream<?, ?> inputStream();

    @Input("inputTable")
    KTable<?, ?> inputTable();
}

在上面的示例中,application 被写为 sink,i.e。没有输出绑定,application 必须决定下游处理。当您以此样式编写 applications 时,您可能希望向下游发送信息或将它们存储在 state store 中(请参阅下面的 Queryable State Stores)。

在传入 KTable 的情况下,如果要将计算实现为 state store,则必须通过以下 property 表达它。

spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs

38.4.2 多输入绑定作为处理器

@EnableBinding(KStreamKTableBinding.class)
....
....

@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
                                     @Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}

interface KStreamKTableBinding extends KafkaStreamsProcessor {

    @Input("inputX")
    KTable<?, ?> inputTable();
}

38.5 多输出绑定(又称分支)

Kafka Streams 允许基于某些谓词将出站数据拆分为多个主题。 Kafka Streams binder 支持此 feature,而不会影响最终用户 application 中通过StreamListener公开的编程 model。

您可以按照常规方式编写 application,如上面单词 count example 中所示。但是,在使用分支 feature 时,您需要做一些事情。首先,您需要确保 return 类型是KStream[]而不是常规KStream。其次,您需要使用包含 order 中输出绑定的SendTo annotation(请参阅下面的 example)。对于每个输出绑定,您需要配置 destination,content-type 等,符合标准 Spring Cloud Stream 预期。

这是一个 example:

@EnableBinding(KStreamProcessorWithBranches.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo({"output1","output2","output3})
    public KStream<?, WordCount>[] process(KStream<Object, String> input) {

			Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
			Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
			Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

			return input
					.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
					.groupBy((key, value) -> value)
					.windowedBy(timeWindows)
					.count(Materialized.as("WordCounts-1"))
					.toStream()
					.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
					.branch(isEnglish, isFrench, isSpanish);
    }

    interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}
}

properties:

spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/json
spring.cloud.stream.bindings.output3.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output1:
  destination: foo
  producer:
    headerMode: raw
spring.cloud.stream.bindings.output2:
  destination: bar
  producer:
    headerMode: raw
spring.cloud.stream.bindings.output3:
  destination: fox
  producer:
    headerMode: raw
spring.cloud.stream.bindings.input:
  destination: words
  consumer:
    headerMode: raw

38.6 消息转换

与基于 message-channel 的 binder applications 类似,Kafka Streams binder 适应 out-of-the-box content-type 转换而不会有任何妥协。

Kafka Streams 操作通常会知道用于正确转换 key 和 value 的 SerDe 的类型。因此,在入站和出站转换中依赖 Apache Kafka Streams library 本身提供的 SerDe 工具可能更自然,而不是使用 framework 提供的 content-type 转换。另一方面,您可能已经熟悉 framework 提供的 content-type 转换模式,并且您希望继续使用入站和出站转换。

Kafka Streams binder implementation 支持这两个选项。

38.6.1 出站序列化

如果禁用本机编码(这是默认设置),则 framework 将使用用户设置的 contentType 转换消息(否则,将应用默认的application/json)。在这种情况下,它将忽略出站序列化的出站上的任何 SerDe 设置。

这是在出站上设置 contentType 的 property。

spring.cloud.stream.bindings.output.contentType: application/json

这是启用本机编码的 property。

spring.cloud.stream.bindings.output.nativeEncoding: true

如果在输出 binding 上启用了本机编码(用户必须如上所述启用它),那么 framework 将跳过出站的任何形式的自动消息转换。在这种情况下,它将切换到用户设置的 Serde。将使用在实际输出 binding 上设置的valueSerde property。这是一个 example。

spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde

如果未设置此 property,则它将使用“默认”SerDe:spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

值得一提的是 Kafka Streams binder 不会在出站时序列化密钥 - 它只依赖于 Kafka 本身。因此,您必须在 binding 上指定keySerde property,否则它将默认为 application-wide common keySerde

Binding level key serde:

spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde

Common Key serde:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde

如果使用分支,则需要使用多个输出绑定。例如,

interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}

如果设置了nativeEncoding,则可以在单个输出绑定上设置不同的 SerDe,如下所示。

spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde

然后如果你有这样的SendTo,@SendTo({“output1”,“output2”,“output3”}),分支中的KStream[]将被应用适当的 SerDe objects,如上所定义。如果未启用nativeEncoding,则可以在输出绑定上设置不同的 contentType 值,如下所示。在这种情况下,framework 将使用适当的消息转换器在发送到 Kafka 之前转换消息。

spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/java-serialzied-object
spring.cloud.stream.bindings.output3.contentType: application/octet-stream

38.6.2 入站反序列化

类似的规则适用于入站数据反序列化。

如果禁用本机解码(这是默认设置),则 framework 将使用用户设置的 contentType 转换消息(否则,将应用默认的application/json)。在这种情况下,它将忽略入站反序列化的入站上的任何 SerDe 集。

这是在入站上设置 contentType 的 property。

spring.cloud.stream.bindings.input.contentType: application/json

这是启用本机解码的 property。

spring.cloud.stream.bindings.input.nativeDecoding: true

如果在输入 binding 上启用了本机解码(用户必须如上所述明确启用它),那么 framework 将跳过对入站进行任何消息转换。在这种情况下,它将切换到用户设置的 SerDe。将使用在实际输出 binding 上设置的valueSerde property。这是一个 example。

spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde

如果未设置此 property,它将使用默认的 SerDe:spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

值得一提的是 Kafka Streams binder 不会对入站密钥进行反序列化 - 它只依赖于 Kafka 本身。因此,您必须在 binding 上指定keySerde property,否则它将默认为 application-wide common keySerde

Binding level key serde:

spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde

Common Key serde:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde

与出站时 KStream 分支的情况一样,每个 binding 设置 value SerDe 的好处是,如果你有多个输入绑定(多个 KStreams object)并且它们都需要单独的 value SerDe,那么你可以单独配置它们。如果使用 common configuration 方法,则此 feature 将不适用。

38.7 错误处理

Apache Kafka Streams 提供了从反序列化错误本机处理 exceptions 的功能。有关此支持的详细信息,请参阅这个开箱即用,Apache Kafka Streams 提供两种反序列化 exception 处理程序 - logAndContinuelogAndFail。如 name 所示,前者将 log 错误并_继续处理下一个记录,后者将 log 错误并失败。 LogAndFail是默认的反序列化 exception 处理程序。

38.7.1 处理反序列化 Exceptions

Kafka Streams binder 通过以下 properties 支持一系列 exception 处理程序。

spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue

除了上面两个反序列化 exception 处理程序之外,binder 还提供了第三个用于将错误记录(毒丸)发送到 DLQ topic。以下是启用此 DLQ exception 处理程序的方法。

spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq

设置上述 property 后,所有反序列化错误记录将自动发送到 DLQ topic。

spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq

如果设置了此项,则错误记录将发送到 topic foo-dlq。如果未设置,则会使用 name error.<input-topic-name>.<group-name>创建 DLQ topic。

在 Kafka Streams binder 中使用 exception 处理 feature 时要记住几件事。

  • property spring.cloud.stream.kafka.streams.binder.serdeError适用于整个 application。这意味着如果同一个 application 中有多个StreamListener方法,则此 property 将应用于所有这些方法。

  • 反序列化的 exception 处理与本机反序列化和 framework 提供的消息转换一致。

38.7.2 处理 Non-Deserialization Exceptions

对于 Kafka Streams binder 中的常规错误处理,最终用户应用程序可以处理 application level 错误。作为为反序列化 exception 处理程序提供 DLQ 的副作用,Kafka Streams binder 提供了一种直接从 application 访问 DLQ 发送 bean 的方法。一旦访问了 bean,就可以以编程方式将 application 中的任何 exception 记录发送到 DLQ。

使用 high-level DSL 继续难以进行强大的错误处理; Kafka Streams 本身不支持错误处理。

但是,在 application 中使用 low-level Processor API 时,可以使用选项来控制此行为。见下文。

@Autowired
private SendToDlqAndContinue dlqHandler;

@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {

    input.process(() -> new Processor() {
    			ProcessorContext context;

    			@Override
    			public void init(ProcessorContext context) {
    				this.context = context;
    			}

    			@Override
    			public void process(Object o, Object o2) {

    			    try {
    			        .....
    			        .....
    			    }
    			    catch(Exception e) {
    			        //explicitly provide the kafka topic corresponding to the input binding as the first argument.
                        //DLQ handler will correctly map to the dlq topic from the actual incoming destination.
                        dlqHandler.sendToDlq("topic-name", (byte[]) o1, (byte[]) o2, context.partition());
    			    }
    			}

    			.....
    			.....
    });
}

38.8 交互式查询

作为公共 Kafka Streams binder API 的一部分,我们公开了一个名为QueryableStoreRegistry的 class。您可以在 application 中将其作为 Spring bean 进行访问。从 application 访问此 bean 的简单方法是在 application 中“自动”bean。

@Autowired
private QueryableStoreRegistry queryableStoreRegistry;

一旦您获得对此 bean 的访问权限,您就可以查询您感兴趣的特定 state-store。见下文。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						queryableStoreRegistry.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

38.9 访问底层的 KafkaStreams object

负责构造KafkaStreams object 的StreamBuilderFactoryBean StreamBuilderFactoryBean可以通过编程方式访问。每个StreamBuilderFactoryBean都注册为stream-builder并附加StreamListener方法 name。如果StreamListener方法被命名为process for example,则流构建器 bean 将命名为stream-builder-process。由于这是一个工厂 bean,因此在以编程方式访问时,应通过添加&符号(&)来访问它。以下是 example 并假设StreamListener方法命名为process

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
			KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();