37. Apache Kafka Binder

37.1 用法

要使用 Apache Kafka binder,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka Starter。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

37.2 Apache Kafka Binder 概述

有关 Apache Kafka binder 如何操作的简化图表如下所示。

图 1_.Kafka Binder

kafka binder

Apache Kafka Binder implementation maps 将每个目标映射到 Apache Kafka topic。 consumer group maps 直接指向相同的 Apache Kafka 概念。对 Apache Kafka 分区也进行 maps 直接分区。

37.3 Configuration 选项

本节包含 Apache Kafka binder 使用的 configuration 选项。

有关 common configuration 选项和与 binder 相关的 properties,请参阅核心文件

37.3.1 Kafka Binder Properties

  • spring.cloud.stream.kafka.binder.brokers

    • Kafka binder 将连接的代理列表。

默认值:localhost

  • spring.cloud.stream.kafka.binder.defaultBrokerPort

    • brokers允许使用或不使用 port 信息(e.g. ,host1,host2:port2)指定的主机。当 broker 列表中没有配置 port 时,此_set 设置默认 port。

默认值:9092

  • spring.cloud.stream.kafka.binder.zkNodes

    • Kafka binder 可以连接的 ZooKeeper 节点列表。

默认值:localhost

  • spring.cloud.stream.kafka.binder.defaultZkPort

    • zkNodes允许使用或不使用 port 信息(e.g. ,host1,host2:port2)指定的主机。如果在节点列表中未配置 port,则设置默认 port。

默认值:2181

  • spring.cloud.stream.kafka.binder.configuration

    • client properties(包括 producer 和 consumer)的 Key/Value map 传递给 binder 创建的所有 clients。由于生产者和消费者都将使用这些 properties,因此应限制使用 common properties,尤其是安全设置。

默认值:清空 map。

  • spring.cloud.stream.kafka.binder.headers

    • 将由 binder 传输的自定义_header 列表。

默认值:空。

  • spring.cloud.stream.kafka.binder.healthTimeout

    • time 等待以秒为单位获取分区信息;默认 60.如果此计时器到期,运行状况将报告为关闭。

默认值:10。

  • spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow

    • 保存偏移的频率(以毫秒为单位)。如果0忽略。

默认值:10000

  • spring.cloud.stream.kafka.binder.offsetUpdateCount

    • 消耗偏移的频率(更新次数)将保持不变。如果0忽略。与offsetUpdateTimeWindow互斥。

默认值:0

  • spring.cloud.stream.kafka.binder.requiredAcks

    • broker 上所需的 ack 数。

默认值:1

  • spring.cloud.stream.kafka.binder.minPartitionCount

    • 仅在设置了autoCreateTopicsautoAddPartitions时有效。 binder 将在 produces/consumes 数据主题上配置的 global 最小分区数。它可以被 producer 的partitionCount设置或 producer 的instanceCount * concurrency设置的 value 取代(如果其中任何一个更大)。

默认值:1

  • spring.cloud.stream.kafka.binder.replicationFactor

    • 如果autoCreateTopics是 active,则 auto-created 主题的复制因子。

默认值:1

  • spring.cloud.stream.kafka.binder.autoCreateTopics

    • 如果设置为true,binder 将自动创建新主题。如果设置为false,binder 将依赖于已配置的主题。在后一种情况下,如果主题不存在,binder 将无法启动。值得注意的是,此设置独立于 broker 的auto.topic.create.enable设置,并且不会影响它:如果服务器设置为 auto-create 主题,则可以使用默认的 broker 设置将其创建为元数据检索请求的一部分。

默认值:true

  • spring.cloud.stream.kafka.binder.autoAddPartitions

    • 如果设置为true,binder 将根据需要创建添加新分区。如果设置为false,则 binder 将依赖已配置的 topic 的分区大小。如果目标 topic 的分区计数小于预期的 value,则 binder 将无法启动。

默认值:false

  • spring.cloud.stream.kafka.binder.socketBufferSize

    • Kafka 使用者使用的 socket 缓冲区的大小(以字节为单位)。

默认值:2097152

37.3.2 Kafka Consumer Properties

以下 properties 仅适用于 Kafka 使用者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

  • autoRebalanceEnabled

    • true时, topic 分区将在 consumer group 的成员之间自动重新平衡。当false时,每个 consumer 将根据spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex分配一组固定的分区。这需要在每个已启动的实例上正确设置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex properties。在这种情况下,property spring.cloud.stream.instanceCount通常必须大于 1。

默认值:true

  • autoCommitOffset

    • 是否在处理消息时自动提交偏移量。如果设置为false,则入站消息中将出现带有org.springframework.kafka.support.Acknowledgment标头类型 key kafka_acknowledgment的标头。 Applications 可以使用此标头来确认消息。有关详细信息,请参阅示例部分当此 property 设置为false时,Kafka binder 会将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL

默认值:true

  • autoCommitOnError

    • 仅在autoCommitOffset设置为true时有效。如果设置为false,它会针对导致错误的消息抑制 auto-commits,并且仅针对成功消息提交,则允许流从上次成功处理的消息中自动重播,以防出现持续故障。如果设置为true,则始终为 auto-commit(如果启用了 auto-commit)。如果未设置(默认),则它实际上具有与enableDlq,auto-committing 错误消息相同的 value,如果它们被发送到 DLQ,否则不提交它们。

默认值:未设置。

  • recoveryInterval

    • 连接恢复尝试之间的间隔,以毫秒为单位。

默认值:5000

  • 开始偏移

    • 新组的起始偏移量。允许的值:earliestlatest。如果 consumer group 是为 consumer'binding'(通过spring.cloud.stream.bindings.<channelName>.group)明确设置的,则'startOffset'设置为earliest;否则为anonymous consumer group 设置为latest

默认值:null(相当于earliest)。

  • enableDlq

    • 设置为 true 时,它将为 consumer 发送启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为error.<destination>.<group>的 topic。 DLQ topic name 可以通过 property dlqName进行配置。对于错误数量相对较小并且重放整个原始 topic 的情况可能过于繁琐的情况,这为更 common Kafka 重放场景提供了备选选项。

默认值:false

  • 组态

    • 带有 key/value 对的 Map 包含泛型 Kafka consumer properties。

默认值:清空 map。

  • dlqName

    • 用于接收错误消息的 DLQ topic 的 name。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的 topic)。

37.3.3 Kafka Producer Properties

以下 properties 仅适用于 Kafka 生成器,必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.为前缀。

  • 缓冲区大小

    • 在发送之前 Kafka producer 将尝试批量处理多少数据的上限(以字节为单位)。

默认值:16384

  • 同步

    • producer 是否同步。

默认值:false

  • batchTimeout

    • producer 在发送 order 之前等待多长时间以允许更多消息在同一批次中累积。 (通常情况下,producer 根本不会等待,只是发送上一次发送在 progress.)中时累积的所有消息。一个 non-zero value 可能会以延迟为代价增加吞吐量。

默认值:0

  • messageKeyExpression

    • 根据用于填充生成的 Kafka 消息的 key 的传出消息计算 SpEL 表达式。对于 example headers.keypayload.myKey

默认值:none

  • 组态

    • 带有 key/value 对的 Map 包含通用 Kafka producer properties。

默认值:清空 map。

Kafka binder 将使用 producer 的partitionCount设置作为提示来创建具有给定分区计数的 topic(与minPartitionCount一起使用,两者中的最大值是使用的 value)。在为 binder 配置minPartitionCount和为 application 配置partitionCount时要小心,因为将使用更大的 value。如果 topic 已存在且分区计数较小且autoAddPartitions被禁用(默认值),则 binder 将无法启动。如果 topic 已存在且分区计数较小且启用了autoAddPartitions,则将添加新分区。如果已存在 topic 且分区数大于(minPartitionCountpartitionCount)的最大分区数,则将使用现有分区计数。

37.3.4 用法示例

在本节中,我们将说明上述 properties 在特定场景中的用法。

示例:设置 autoCommitOffset false 并依赖于手动执行。

此 example 说明了如何在 consumer application 中手动确认偏移量。

此 example 要求spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为 false。为 example 使用相应的输入 channel name。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:security configuration

Apache Kafka 0.9 支持 client 和代理之间的安全连接。要利用此 feature,请遵循Apache Kafka 文档中的指导以及 Kafka 0.9 Confluent 文档中的安全准则。使用spring.cloud.stream.kafka.binder.configuration选项为 binder 创建的所有 clients 设置 security properties。

对于 example,要将security.protocol设置为SASL_SSL,请设置:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全性 properties 都可以以类似的方式设置。

使用 Kerberos 时,请按照reference 文档中的说明进行 creating 并引用 JAAS configuration。

Spring Cloud Stream 支持使用 JAAS configuration 文件并使用 Spring Boot properties 将 JAAS configuration 信息传递给 application。

使用 JAAS configuration files

可以使用 system properties 为 Spring Cloud Stream applications 设置 JAAS 和(可选)krb5 文件位置。以下是使用 JAAS configuration 文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream application 的示例:

java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot properties

作为拥有 JAAS configuration 文件的替代方法,Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot properties 为 Spring Cloud Stream applications 设置 JAAS configuration。

以下 properties 可用于配置 Kafka client 的 login context。

  • spring.cloud.stream.kafka.binder.jaas.loginModule

    • 登录模块 name。在正常情况下无需设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

  • spring.cloud.stream.kafka.binder.jaas.controlFlag

    • 登录模块的控制 flag。

默认值:required

  • spring.cloud.stream.kafka.binder.jaas.options

    • 带有 key/value 对的 Map 包含登录模块选项。

默认值:清空 map。

这是使用 Spring Boot configuration properties 使用 SASL 和 Kerberos 启动 Spring Cloud Stream application 的示例:

java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.[emailprotected]EXAMPLE.COM

这表示以下 JAAS 文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[emailprotected]";
};

如果 broker 上已存在所需主题,或者将由管理员创建,则可以关闭自动创建,并且只需要发送 client JAAS properties。作为设置spring.cloud.stream.kafka.binder.autoCreateTopics的替代方法,您只需从 application 中删除 broker 依赖项即可。有关详细信息,请参阅名为“从基于 binder 的 application 的 classpath 中排除 Kafka broker jar”一节

不要在同一个 application 中混合使用 JAAS configuration files 和 Spring Boot properties。如果-Djava.security.auth.login.config system property 已经存在,Spring Cloud Stream 将忽略 Spring Boot properties。

如果使用 Kerberos,请在使用autoCreateTopicsautoAddPartitions时务必小心。通常 applications 可能使用 Kafka 和 Zookeeper 中没有管理权限的主体,并且依赖 Spring Cloud Stream to create/modify 主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。

将 binder 与 Apache 一起使用 Kafka 0.10

Spring Cloud Stream Kafka binder 中的默认 Kafka 支持适用于 Kafka version 0.10.1.1. binder 还支持连接到其他基于 0.10 的版本和 0.9 clients。在_ to to to this 中,当您创建包含 application 的项目时,请像通常对默认 binder 那样包含spring-cloud-starter-stream-kafka。然后在 pom.xml 文件的<dependencies>部分顶部添加这些依赖项以覆盖依赖项。

这是将 application 降级为 0.10.0.1 的示例。由于它仍然在 0.10 line 上,因此可以保留默认的spring-kafkaspring-integration-kafka版本。

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.1</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.1</version>
</dependency>

这是使用 0.9.0.1 version 的另一个例子。

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.0.5.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-kafka</artifactId>
  <version>2.0.1.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.1</version>
</dependency>

上述版本仅为 example 提供。为获得最佳结果,我们建议使用最新的 0.10-compatible 版本的项目。

从基于 binder 的 application 的 classpath 中排除 Kafka broker jar

Apache Kafka Binder 使用管理实用程序作为 Apache Kafka 服务器 library 的一部分来创建和重新配置主题。如果在运行时不需要包含 Apache Kafka 服务器 library 及其依赖项,因为 application 将依赖于以管理方式配置的主题,Kafka binder 允许从 application 中排除 Apache Kafka 服务器依赖项。

如果您按照上面的建议对 Kafka 依赖项使用非默认版本,那么您所要做的就是不要包含 kafka broker 依赖项。如果使用默认的 Kafka version,请确保从spring-cloud-starter-stream-kafka依赖项中排除 kafka broker jar,如下所示。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  <exclusions>
    <exclusion>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
    </exclusion>
  </exclusions>
</dependency>

如果排除 Apache Kafka 服务器依赖项并且服务器上不存在 topic,则 Apache Kafka broker 将在服务器上启用 auto topic 创建时创建 topic。请记住,如果您依赖于此,那么 Kafka 服务器将使用默认的分区数和复制因子。另一方面,如果在服务器上禁用了自动 topic 创建,则必须小心 running application 以创建具有所需分区数的 topic。

如果要完全控制分区的分配方式,请保留默认设置 i.e。不要排除 kafka broker jar 并确保spring.cloud.stream.kafka.binder.autoCreateTopics设置为true,这是默认值。

37.4 Kafka Streams Binding Spring Cloud Stream 的功能

Spring Cloud Stream Kafka 支持还包括专为 Kafka Streams binding 设计的 binder。使用此 binder,可以编写利用 Kafka Streams API 的 applications。有关 Kafka Streams 的更多信息,请参阅Kafka Streams API 开发人员手册

_Spaf 中的 Kafka Streams 支持_Cloud Stream 基于 Spring Kafka 项目提供的基础。有关该支持的详细信息,请参阅Kafaka Streams 支持 Spring Kafka

以下是 Spring Cloud Stream KStream binder artifact 的 maven 坐标。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>

除了利用基于 Spring Boot 的 Spring Cloud Stream 编程 model 之外,KStream binder 提供的主要其他好处之一是它避免了在直接使用 Kafka Streams API 时需要编写的样板配置。通过 Kafka Streams API 提供的高 level 流 DSL 可以在当前支持中通过 Spring Cloud Stream 使用。

37.4.1 用法高级流的 DSL 示例

这个 application 将从 Kafka topic 中侦听,并为 5 秒 time 窗口中看到的每个唯一单词写入单词 count。

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {

	@StreamListener("input")
	@SendTo("output")
	public KStream<?, String> process(KStream<?, String> input) {
		return input
				.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
				.map((key, word) -> new KeyValue<>(word, word))
				.groupByKey(Serdes.String(), Serdes.String())
				.count(TimeWindows.of(5000), "store-name")
				.toStream()
				.map((w, c) -> new KeyValue<>(null, "Count for " + w.key() + ": " + c));
	}

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}

如果你 build 它 string Boot runnable fat jar,你可以用以下方式运行上面的 example:

java -jar uber.jar  --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts

这意味着 application 将从传入的 Kafka topic 单词中侦听并写入输出 topic 计数。

Spring Cloud Stream 将确保来自传入和传出主题的消息都绑定为 KStream objects。正如人们可能观察到的那样,开发人员可以专注于 code,i.e 的业务方面。编写处理器中所需的逻辑,而不是设置 Kafka Streams 基础结构所需的特定于流的 configuration。所有这些样板都由 Spring Cloud Stream 在幕后处理。

37.4.2 支持交互式查询

如果交互式查询需要访问KafkaStreams,则可以通过KStreamBuilderFactoryBean.getKafkaStreams()访问内部KafkaStreams实例。您可以自动装配 KStream binder 提供的KStreamBuilderFactoryBean实例。然后你可以从中获取KafkaStreams实例并检索底层的 store,对它执行查询等。

37.4.3 Kafka Streams properties

  • 组态

    • Map 包含 key/value 对,包含与 Kafka Streams API 相关的 properties。此 property 必须以spring.cloud.stream.kstream.binder.为前缀。
Following are some examples of using this property.
spring.cloud.stream.kstream.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs.

也可以有 binding 特定的 properties。

例如,您可以为输入或输出目的地使用不同的 Serde。

spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
  • timewindow.length

    • 使用 Kafka Streams 编写的许多流式应用程序涉及 windowning 操作。如果指定此 property,则会自动提供org.apache.kafka.streams.kstream.TimeWindows bean,可以在 applications 中自动装配。此 property 必须以spring.cloud.stream.kstream.为前缀。仅当提供此 property 时,才会创建org.apache.kafka.streams.kstream.TimeWindows类型的 bean。
Following is an example of using this property.
Values are provided in milliseconds.
spring.cloud.stream.kstream.timeWindow.length=5000
  • timewindow.advanceBy

    • 这个 property 与timewindow.length齐头并进,并且对它自己没有任何影响。如果您提供此 property,则生成的org.apache.kafka.streams.kstream.TimeWindows bean 将自动保留此信息。此 property 必须以spring.cloud.stream.kstream.为前缀。
Following is an example of using this property.
Values are provided in milliseconds.
spring.cloud.stream.kstream.timeWindow.advanceBy=1000

37.5 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见名为“Message Channel Binder and Error Channels”的部分

发送失败的ErrorMessage的有效负载是带有 properties 的KafkaSendFailureException

  • failedMessage - 无法发送的 spring-messaging Message<?>

  • record - 从failedMessage创建的原始ProducerRecord

没有自动处理这些 exceptions(例如发送到Dead-Letter 队列);您可以使用自己的 Spring Integration 流来使用这些 exceptions。

37.6 Kafka Metrics

Kafka binder 模块公开以下 metrics:

spring.cloud.stream.binder.kafka.someGroup.someTopic.lag - 此度量标准表示给定 binder group 的给定 binder 的 topic 尚未消耗的消息数量。对于 example,如果度量spring.cloud.stream.binder.kafka.myGroup.myTopic.lag的 value 是1000,那么 consumer group myGroup有等待从 topic myTopic消耗的1000消息。此指标对于向您选择的 PaaS 平台提供 auto-scaling 反馈特别有用。

37.7 Dead-Letter Topic Processing

由于无法预测用户如何处理 dead-lettered 消息,framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始的 topic。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下spring-boot application 是如何将这些消息重新路由回原始 topic 的示例,但在三次尝试后将它们移动到第三个“停车场”topic。 application 只是从 dead-letter topic 读取的另一个 spring-cloud-stream application。它在 5 秒内没有收到任何消息时终止。

这些示例假设原始目标是so8400out,consumer group 是so8400

有几个注意事项。

  • 当主 application 不是 running 时,只考虑运行重新路由。否则,瞬态错误的重试将很快用完。

  • 或者,使用 two-stage 方法 - 使用此 application 来路由到第三个 topic,使用另一个 route 从那里返回到主 topic。

  • 由于此技术使用消息头来跟踪重试,因此它不适用于headerMode=raw。在这种情况下,请考虑向有效负载添加一些数据(主应用程序可以忽略这些数据)。

  • 必须将x-retries添加到headers property spring.cloud.stream.kafka.binder.headers=x-retries和_ application 上,以便在 applications 之间传输标头。

  • 由于 kafka 是 publish/subscribe,因此重播的消息将被发送到每个 consumer group,甚至是那些在第一个 time 时成功处理消息的消息。

application.properties.

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

应用.

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}