40. Apache Kafka StreamsBinder

40.1 Usage

要使用 Kafka Streams 绑定程序,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

40.2 Kafka Streams Binder 概述

Spring Cloud Stream 的 Apache Kafka 支持还包括明确为 Apache Kafka Streams 绑定设计的绑定器实现。通过这种本机集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用Apache Kafka 流 API。

Kafka StreamsBinder 实现基于Spring 的 Kafka 溪流项目提供的基础。

Kafka StreamsBinder 为 Kafka Streams 中的三种主要类型(KStream,KTable 和 GlobalKTable)提供了绑定功能。

作为本机集成的一部分,Kafka Streams API 提供的高级Streams DSL可用于业务逻辑。

也提供Processor API支持的早期版本。

如前所述,Spring Cloud Stream 中的 Kafka Streams 支持仅在处理器模型中严格可用。可以应用一种模型,在该模型中,可以从入站主题读取消息,进行业务处理,并且可以将转换后的消息写入出站主题。它也可以用于无出站目的地的处理器应用程序中。

40.2.1 流 DSL

该应用程序使用来自 Kafka 主题(例如words)的数据,在 5 秒的时间窗口内为每个唯一单词计算单词计数,并将计算出的结果发送到下游主题(例如counts)以进行进一步处理。

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {

	@StreamListener("input")
	@SendTo("output")
	public KStream<?, WordCount> process(KStream<?, String> input) {
		return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}

构建为 uber-jar(例如wordcount-processor.jar)后,您可以像下面一样运行上述示例。

java -jar wordcount-processor.jar  --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts

该应用程序将使用来自 Kafka 主题words的消息,并将计算结果发布到输出主题counts

Spring Cloud Stream 将确保来自传入和传出主题的消息都自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。框架自动处理设置 Kafka Streams 基础结构所需的 Streams DSL 特定配置。

40.3 配置选项

本节包含 Kafka Streams 绑定程序使用的配置选项。

有关与 Binder 有关的常见配置选项和属性,请参阅core documentation

40.3.1 Kafka Streams 属性

以下属性在 Binder 级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.Literals 作为前缀。

  • configuration

    • 使用键/值对进行 Map,该键/值对包含与 Apache Kafka Streams API 有关的属性。此属性必须以spring.cloud.stream.kafka.streams.binder.为前缀。以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关可能用于流配置的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。

  • brokers

    • Broker URL

默认值:localhost

  • zkNodes

    • Zookeeper URL

默认值:localhost

  • serdeError

    • 反序列化错误处理程序类型。可能的值为-logAndContinuelogAndFailsendToDlq

默认值:logAndFail

  • applicationId

    • 在绑定程序级别全局设置 Kafka Streams 应用程序的 application.id 的简便方法。如果应用程序包含多个StreamListener方法,则应在每个 Importing 绑定的绑定级别上设置 application.id。

默认值:none

以下属性仅可用于 Kafka Streams 生产商,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.字面量作为前缀。为了方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以使用前缀spring.cloud.stream.kafka.streams.default.producer.进行配置。

  • keySerde

    • 使用的主要服务

默认值:none

  • valueSerde

    • 使用价值服务

默认值:none

  • useNativeEncoding

    • 标志以启用本机编码

默认值:false

以下属性仅对 Kafka StreamsConsumer 可用,并且必须以spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.literal. For convenience, if there multiple input bindings and they all require a common value, that can be configured by using the prefix spring.cloud.stream.kafka.streams.default.consumer.作为前缀。

  • applicationId

    • 设置每个 Importing 绑定的 application.id。

默认值:none

  • keySerde

    • 使用的主要服务

默认值:none

  • valueSerde

    • 使用价值服务

默认值:none

  • materializedAs

    • 使用传入的 KTable 类型时实现状态存储

默认值:none

  • useNativeDecoding

    • 标志以启用本机解码

默认值:false

  • dlqName

    • DLQ 主题名称。

默认值:none

40.3.2 TimeWindow 属性:

窗口化是流处理应用程序中的重要概念。以下属性可用于配置时间窗口计算。

  • spring.cloud.stream.kafka.streams.timeWindow.length

    • 赋予此属性后,您可以将TimeWindows bean 自动连接到应用程序中。该值以毫秒为单位。

默认值:none

  • spring.cloud.stream.kafka.streams.timeWindow.advanceBy

    • 值以毫秒为单位。

默认值:none

40.4 多个 Importing 绑定

对于需要多个传入 KStream 对象或 KStream 和 KTable 对象组合的用例,Kafka Streams 绑定程序提供了多个绑定支持。

让我们来看看它的作用。

40.4.1 多个 Importing 绑定作为接收器

@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
                    @Input("inputTable") KTable<Long, Song> songTable) {
                    ....
                    ....
}

interface KStreamKTableBinding {

    @Input("inputStream")
    KStream<?, ?> inputStream();

    @Input("inputTable")
    KTable<?, ?> inputTable();
}

在以上示例中,应用程序被编写为接收器,即没有输出绑定,并且该应用程序必须决定有关下游处理。当您以这种风格编写应用程序时,您可能希望向下游发送信息或将其存储在状态存储中(有关可查询状态存储,请参见下文)。

对于传入的 KTable,如果要将计算具体化为状态存储,则必须通过以下属性将其表示。

spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs

上面的示例显示了使用 KTable 作为 Importing 绑定。绑定器还支持 GlobalKTable 的 Importing 绑定。当您必须确保应用程序的所有实例都可以访问主题中的数据更新时,GlobalKTable 绑定非常有用。 KTable 和 GlobalKTable 绑定仅在 Importing 上可用。Binder 支持 KStream 的 Importing 和输出绑定。

40.4.2 作为处理器的多个 Importing 绑定

@EnableBinding(KStreamKTableBinding.class)
....
....

@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
                                     @Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}

interface KStreamKTableBinding extends KafkaStreamsProcessor {

    @Input("inputX")
    KTable<?, ?> inputTable();
}

40.5 多个输出绑定(也称为分支)

Kafka Streams 允许根据某些谓词将出站数据分为多个主题。 Kafka StreamsBinder 提供对此功能的支持,而不会损害最终用户应用程序中通过StreamListener公开的编程模型。

您可以按照上面在字数示例中展示的常用方法编写应用程序。但是,使用分支功能时,您需要做一些事情。首先,您需要确保返回类型为KStream[]而不是常规的KStream。其次,您需要按 Sequences 使用包含输出绑定的SendToComments(请参见下面的示例)。对于这些输出绑定中的每一个,您都需要配置目标,Content Type 等,并符合标准的 Spring Cloud Stream 期望。

这是一个例子:

@EnableBinding(KStreamProcessorWithBranches.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo({"output1","output2","output3})
    public KStream<?, WordCount>[] process(KStream<Object, String> input) {

			Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
			Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
			Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

			return input
					.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
					.groupBy((key, value) -> value)
					.windowedBy(timeWindows)
					.count(Materialized.as("WordCounts-1"))
					.toStream()
					.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
					.branch(isEnglish, isFrench, isSpanish);
    }

    interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}
}

Properties:

spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/json
spring.cloud.stream.bindings.output3.contentType: application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.configuration:
  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.output1:
  destination: foo
  producer:
    headerMode: raw
spring.cloud.stream.bindings.output2:
  destination: bar
  producer:
    headerMode: raw
spring.cloud.stream.bindings.output3:
  destination: fox
  producer:
    headerMode: raw
spring.cloud.stream.bindings.input:
  destination: words
  consumer:
    headerMode: raw

40.6 邮件转换

与基于消息通道的 Binder 应用程序类似,Kafka StreamsBinder 可适应现成的 Content Type 转换,而不会做出任何妥协。

对于 Kafka Streams 操作而言,通常要知道用于正确转换键和值的 SerDe 类型。因此,在入站和出站转换时依靠 Apache Kafka Streams 库本身提供的 SerDe 工具比使用框架提供的 Content Type 转换更为自然。另一方面,您可能已经熟悉框架提供的 Content Type 转换模式,并且您希望 continue 用于入站和出站转换。

Kafka Streams 绑定程序实现支持这两个选项。

40.6.1 出站序列化

如果禁用本机编码(这是默认设置),则框架将使用用户设置的 contentType 转换消息(否则,将应用默认的application/json)。在这种情况下,它将忽略出站上设置的任何 SerDe,以进行出站序列化。

这是在出站上设置 contentType 的属性。

spring.cloud.stream.bindings.output.contentType: application/json

这是启用本地编码的属性。

spring.cloud.stream.bindings.output.nativeEncoding: true

如果在输出绑定上启用了本机编码(用户必须如上所述明确地启用它),则框架将在出站上跳过任何形式的自动消息转换。在这种情况下,它将切换到用户设置的 Serde。将使用在实际输出绑定上设置的valueSerde属性。这是一个例子。

spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde

如果未设置此属性,则它将使用"default" SerDe:spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

值得一提的是,Kafka StreamsBinder 不会在出站上序列化密钥-它仅依赖于 Kafka 本身。因此,您必须在绑定上指定keySerde属性,否则它将默认为应用程序范围的通用keySerde

绑定级别键序列号:

spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde

公用密钥序列:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde

如果使用分支,则需要使用多个输出绑定。例如,

interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}

如果设置了nativeEncoding,则可以如下对各个输出绑定设置不同的 SerDe。

spring.cloud.stream.kafka.streams.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kafka.streams.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kafka.streams.bindings.output3.producer.valueSerde=JsonSerde

然后,如果您具有SendTo这样的@SendTo( {"output1", "output2", "output3"}),则将分支中的KStream[]与上面定义的正确的 SerDe 对象一起应用。如果未启用nativeEncoding,则可以如下在输出绑定上设置不同的 contentType 值。在这种情况下,框架将使用适当的消息转换器来转换消息,然后再发送给 Kafka。

spring.cloud.stream.bindings.output1.contentType: application/json
spring.cloud.stream.bindings.output2.contentType: application/java-serialzied-object
spring.cloud.stream.bindings.output3.contentType: application/octet-stream

40.6.2 入站反序列化

类似的规则适用于入站数据反序列化。

如果禁用本机解码(这是默认设置),则框架将使用用户设置的 contentType 转换消息(否则,将应用默认的application/json)。在这种情况下,它将针对入站反序列化而忽略入站上设置的任何 SerDe。

这是在入站上设置 contentType 的属性。

spring.cloud.stream.bindings.input.contentType: application/json

这是启用本机解码的属性。

spring.cloud.stream.bindings.input.nativeDecoding: true

如果在 Importing 绑定上启用了本机解码(用户必须如上所述明确地启用它),则框架将跳过对入站进行的任何消息转换。在这种情况下,它将切换到用户设置的 SerDe。将使用在实际输出绑定上设置的valueSerde属性。这是一个例子。

spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde

如果未设置此属性,它将使用默认的 SerDe:spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

值得一提的是,Kafka Streams 绑定程序不会反序列化入站的密钥-它仅依赖于 Kafka 本身。因此,您必须在绑定上指定keySerde属性,否则它将默认为应用程序范围的通用keySerde

绑定级别键序列号:

spring.cloud.stream.kafka.streams.bindings.input.consumer.keySerde

公用密钥序列:

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde

与在出站上进行 KStream 分支的情况一样,为每个绑定设置值 SerDe 的好处是,如果您有多个 Importing 绑定(多个 KStreams 对象)并且它们都需要单独的值 SerDe,则可以分别配置它们。如果使用通用配置方法,则此功能将不适用。

40.7 错误处理

Apache Kafka Streams 提供了本机处理反序列化错误引起的异常的功能。有关此支持的详细信息,请参见this开箱即用,Apache Kafka Streams 提供了两种反序列化异常处理程序-logAndContinuelogAndFail。顾名思义,前者将记录错误并 continue 处理下一条记录,而后者将记录错误并失败。 LogAndFail是默认的反序列化异常处理程序。

40.7.1 处理反序列化异常

Kafka StreamsBinder 通过以下属性支持一系列异常处理程序。

spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue

除了以上两个反序列化异常处理程序外,绑定程序还提供了第三个用于将错误记录(毒丸)发送到 DLQ 主题的代理。这是启用此 DLQ 异常处理程序的方法。

spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq

设置上述属性后,所有反序列化错误记录都会自动发送到 DLQ 主题。

spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq

如果已设置,则错误记录将发送到主题foo-dlq。如果未设置,它将创建一个名称为error.<input-topic-name>.<group-name>的 DLQ 主题。

在 Kafka StreamsBinder 中使用异常处理功能时,需要记住两件事。

  • 属性spring.cloud.stream.kafka.streams.binder.serdeError适用于整个应用程序。这意味着,如果同一应用程序中有多个StreamListener方法,则此属性将应用于所有方法。

  • 反序列化的异常处理与本机反序列化和框架提供的消息转换一致。

40.7.2 处理非反序列化异常

对于 Kafka Streams 联编程序中的常规错误处理,最终用户应用程序可以处理应用程序级错误。作为为反序列化异常处理程序提供 DLQ 的副作用,Kafka Streams 绑定程序提供了一种直接从您的应用程序访问 DLQ 发送 bean 的方法。一旦访问了该 bean,就可以以编程方式将任何异常记录从应用程序发送到 DLQ。

使用高级 DSL 进行错误处理仍然很困难。 Kafka Streams 本身还不支持错误处理。

但是,当您在应用程序中使用低级处理器 API 时,有一些选项可以控制此行为。见下文。

@Autowired
private SendToDlqAndContinue dlqHandler;

@StreamListener("input")
@SendTo("output")
public KStream<?, WordCount> process(KStream<Object, String> input) {

    input.process(() -> new Processor() {
    			ProcessorContext context;

    			@Override
    			public void init(ProcessorContext context) {
    				this.context = context;
    			}

    			@Override
    			public void process(Object o, Object o2) {

    			    try {
    			        .....
    			        .....
    			    }
    			    catch(Exception e) {
    			        //explicitly provide the kafka topic corresponding to the input binding as the first argument.
                        //DLQ handler will correctly map to the dlq topic from the actual incoming destination.
                        dlqHandler.sendToDlq("topic-name", (byte[]) o1, (byte[]) o2, context.partition());
    			    }
    			}

    			.....
    			.....
    });
}

40.8 国家 Store

使用 DSL 时,状态存储由 Kafka Streams 自动创建。使用处理器 API 时,您需要手动注册状态存储。为此,您可以使用KafkaStreamsStateStoreComments。您可以指定存储的名称和类型,用于控制日志和禁用高速缓存的标志等。一旦在引导阶段由绑定程序创建了存储,就可以通过处理器 API 访问此状态存储。下面是一些执行此操作的 Primitives。

创建状态存储:

@KafkaStreamsStateStore(name="mystate", type= KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs=300000)
public void process(KStream<Object, Product> input) {
    ...
}

访问状态存储:

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

40.9 交互式查询

作为公开的 Kafka Streams 绑定程序 API 的一部分,我们公开了一个名为InteractiveQueryService的类。您可以在应用程序中将其作为 Spring bean 访问。从您的应用程序访问该 bean 的一种简单方法是“自动装配”该 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦获得对该 bean 的访问权限,就可以查询您感兴趣的特定状态存储。见下文。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

如果有多个 Kafka Streams 应用程序实例正在运行,则在以交互方式查询它们之前,您需要确定哪个应用程序实例承载密钥。 InteractiveQueryService API 提供了用于标识主机信息的方法。

为了使它起作用,必须配置属性application.server,如下所示:

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段:

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

40.10 访问基础的 KafkaStreams 对象

可以通过编程方式访问 spring-kafka 中负责构建KafkaStreams对象的StreamBuilderFactoryBean。每个StreamBuilderFactoryBean被注册为stream-builder并附加了StreamListener方法名称。例如,如果您的StreamListener方法命名为process,则流生成器 bean 命名为stream-builder-process。由于这是工厂 bean,因此在以编程方式访问它时,应在前面加上一个&符号(&)以对其进行访问。下面是一个示例,并假定StreamListener方法名为process

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
			KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();

40.11 状态清除

默认情况下,绑定停止时将调用Kafkastreams.cleanup()方法。参见Spring Kafka 文档。要修改此行为,只需向应用程序上下文中添加一个CleanupConfig @Bean(配置为在启动,停止或都不清除时进行清除);该 bean 将被检测到并连接到工厂 bean 中。