39. Apache KafkaBinder

39.1 Usage

要使用 Apache KafkaBinder,您需要将spring-cloud-stream-binder-kafka作为依赖项添加到 Spring Cloud Stream 应用程序中,如以下 Maven 示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

另外,您也可以使用 Spring Cloud Stream Kafka Starter,如以下针对 Maven 的示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

39.2 Apache Kafka Binder 概述

下图显示了 Apache KafkaBinder 的工作方式的简化图:

图 39.1. Kafka·宾德

kafka binder

Apache Kafka Binder 实现将每个目标 Map 到一个 Apache Kafka 主题。Consumer 组直接 Map 到相同的 Apache Kafka 概念。分区也直接 Map 到 Apache Kafka 分区。

Binder 当前使用 Apache Kafka kafka-clients 1.0.0 jar,并且设计用于至少该版本的代理。该 Client 端可以与较旧的代理进行通信(请参阅 Kafka 文档),但是某些功能可能不可用。例如,对于低于 0.11.x.x 的版本,不支持本机头。此外,0.11.x.x 不支持autoAddPartitions属性。

39.3 配置选项

本节包含 Apache KafkaBinder 使用的配置选项。

有关与 Binder 有关的常见配置选项和属性,请参见core documentation

39.3.1 Kafka Binder 属性

  • spring.cloud.stream.kafka.binder.brokers

    • KafkaBinder 连接到的 broker 列表。

默认值:localhost

  • spring.cloud.stream.kafka.binder.defaultBrokerPort

    • brokers允许指定带有或不带有端口信息的主机(例如host1,host2:port2)。当在代理列表中未配置任何端口时,这将设置默认端口。

默认值:9092

  • spring.cloud.stream.kafka.binder.configuration

    • Client 端属性(生产者和 Consumer)的键/值 Map 传递给绑定程序创建的所有 Client 端。由于生产者和 Consumer 都使用了这些属性,因此应将使用限制为通用属性,例如安全设置。此处的属性取代引导中设置的所有属性。

默认值:空 Map。

  • spring.cloud.stream.kafka.binder.consumerProperties

    • 任意 KafkaClient 端使用者属性的键/值 Map。此处的属性将取代在启动和上面的configuration属性中设置的所有属性。

默认值:空 Map。

  • spring.cloud.stream.kafka.binder.headers

    • Binder 传输的自定义 Headers 列表。仅当与kafka-clients版本<0.11.0.0 的较旧应用程序(⇐1.3.x)通信时才需要。较新的版本本机支持 Headers。

Default: empty.

  • spring.cloud.stream.kafka.binder.healthTimeout

    • await 获取分区信息的时间,以秒为单位。如果此计时器到期,运行状况将报告为已关闭。

Default: 10.

  • spring.cloud.stream.kafka.binder.requiredAcks

    • 代理程序上所需的确认数。请参阅生产者acks属性的 Kafka 文档。

默认值:1

  • spring.cloud.stream.kafka.binder.minPartitionCount

    • 仅在设置了autoCreateTopicsautoAddPartitions时有效。Binder 在生成或使用数据的主题上配置的全局最小分区数。可以通过生产者的partitionCount设置或生产者的instanceCount * concurrency设置的值(如果较大)代替。

默认值:1

  • spring.cloud.stream.kafka.binder.producerProperties

    • 任意 KafkaClient 端生产者属性的键/值 Map。此处的属性将取代在启动和上面的configuration属性中设置的所有属性。

默认值:空 Map。

  • spring.cloud.stream.kafka.binder.replicationFactor

    • 如果autoCreateTopics处于Active状态,则自动创建的主题的复制因子。可以在每个绑定上覆盖。

默认值:1

  • spring.cloud.stream.kafka.binder.autoCreateTopics

    • 如果设置为true,则 Binder 将自动创建新主题。如果设置为false,则 Binder 依赖于已配置的主题。在后一种情况下,如果主题不存在,则 Binder 无法启动。

Note

此设置与代理的auto.topic.create.enable设置无关,并且不影响它。如果服务器设置为自动创建主题,则可以使用默认代理设置将它们作为元数据检索请求的一部分进行创建。

默认值:true

  • spring.cloud.stream.kafka.binder.autoAddPartitions

    • 如果设置为true,则绑定器会根据需要创建新的分区。如果设置为false,则 Binder 依赖于已配置的主题的分区大小。如果目标主题的分区数小于预期值,则 Binder 无法启动。

默认值:false

  • spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

    • 在 Binder 中启用事务。请参阅 Kafka 文档中的transaction.idspring-kafka文档中的Transactions。启用事务后,单个producer属性将被忽略,所有生产者都将使用spring.cloud.stream.kafka.binder.transaction.producer.*属性。

默认值null(无 Transaction)

  • spring.cloud.stream.kafka.binder.transaction.producer.*

    • Transaction 绑定中生产者的 Global 生产者属性。请参阅spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix第 39.3.3 节“ Kafka 生产者属性”以及所有 Binder 支持的常规生产者属性。

默认值:查看单个生产者属性。

  • spring.cloud.stream.kafka.binder.headerMapperBeanName

    • KafkaHeaderMapper的 bean 名称,用于将spring-messagingHeadersMap 到 KafkaHeaders 和从 KafkaHeadersMap。例如,如果您希望在使用 JSON 反序列化头的DefaultKafkaHeaderMapper中自定义受信任的程序包,请使用此方法。

Default: none.

39.3.2 KafkaConsumer 属性

以下属性仅适用于 Kafka 使用者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.作为前缀。

  • admin.configuration

    • 设置主题时使用的Map Kafka 主题属性,例如spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

Default: none.

  • admin.replicas-assignment

    • 副本分配的 Map<Integer, List>,键为分区,值为分配。在配置新主题时使用。请参阅kafka-clients jar 中的NewTopic Javadocs。

Default: none.

  • admin.replication-factor

    • 设置主题时要使用的复制因子。覆盖 Binder 范围的设置。是否忽略replicas-assignments

默认值:无(使用资料夹范围的默认值 1)。

  • autoRebalanceEnabled

    • true时,主题分区将在使用者组的成员之间自动重新平衡。当false时,将为每个使用者分配基于spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex的一组固定分区。这要求在每个启动的实例上都正确设置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex属性。在这种情况下,spring.cloud.stream.instanceCount属性的值通常必须大于 1.

默认值:true

  • ackEachRecord

    • autoCommitOffsettrue时,此设置指示在处理每个记录后是否提交偏移量。默认情况下,在处理consumer.poll()返回的记录批次中的所有记录之后,将提交偏移量。轮询返回的记录数可以通过max.poll.records Kafka 属性控制,该属性是通过使用者configuration属性设置的。将此设置为true可能会导致性能下降,但是这样做会降低发生故障时重新传送记录的可能性。另外,请参见 BinderrequiredAcks属性,该属性也影响落实偏移量的性能。

默认值:false

  • autoCommitOffset

    • 处理消息后是否自动提交偏移量。如果设置为false,则入站消息中将出现具有org.springframework.kafka.support.AcknowledgmentHeaders 类型的键kafka_acknowledgment的 Headers。应用程序可以使用此 Headers 来确认消息。有关详细信息,请参见示例部分。当此属性设置为false时,KafkaBinder 将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,应用程序负责确认记录。另请参见ackEachRecord

默认值:true

  • autoCommitOnError

    • 仅在autoCommitOffset设置为true时有效。如果设置为false,它将抑制导致错误的消息的自动提交,仅对成功的消息进行提交。如果持续出现故障,它允许流从上次成功处理的消息自动重播。如果设置为true,它将始终自动提交(如果启用了自动提交)。如果未设置(默认值),则它实际上具有与enableDlq相同的值,如果将错误消息发送到 DLQ,则自动提交错误消息,否则不提交。

默认值:未设置。

  • resetOffsets

    • 是否将使用者的偏移量重置为 startOffset 提供的值。

默认值:false

  • startOffset

    • 新组的起始偏移量。允许的值:earliestlatest。如果为 Consumer“绑定”显式设置了 Consumer 组(通过spring.cloud.stream.bindings.<channelName>.group),则“ startOffset”将设置为earliest。否则,对于anonymous用户组将其设置为latest。另请参见resetOffsets(在此列表的前面)。

默认值:null(等于earliest)。

  • enableDlq

    • 设置为 true 时,它将为使用者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为error.<destination>.<group>的主题。可以通过设置dlqName属性来配置 DLQ 主题名称。当错误数量相对较小并且重放整个原始主题可能太麻烦时,这为更常见的 Kafka 重播方案提供了一个替代选项。有关更多信息,请参见第 39.6 节“死信主题处理”处理。从 2.0 版开始,发送到 DLQ 主题的消息将通过以下 Headers 得到增强:x-original-topicx-exception-messagex-exception-stacktrace作为byte[]destinationIsPatterntrue时不允许.

默认值:false

  • configuration

    • 使用包含通用 Kafka 使用者属性的键/值对进行 Map。

默认值:空 Map。

  • dlqName

    • 接收错误消息的 DLQ 主题的名称。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的主题)。

  • dlqProducerProperties

    • 使用此功能,可以设置特定于 DLQ 的生产者属性。通过 kafka 生产者属性可以使用的所有属性都可以通过该属性设置。

默认值:默认的 Kafka 生产者属性。

  • standardHeaders

    • 指示入站通道适配器填充哪些标准头。允许的值:noneidtimestampboth。如果使用本机反序列化并且第一个组件接收消息需要id(例如配置为使用 JDBC 消息存储的聚合器),则很有用。

默认值:none

  • converterBeanName

    • 实现RecordMessageConverter的 bean 的名称。在入站通道适配器中用于替换默认的MessagingMessageConverter

默认值:null

默认值:30000

  • destinationIsPattern

    • 如果为 true,则将目的地视为正则表达式Pattern,以供代理匹配主题名称。如果为 true,则不设置主题,并且不允许enableDlq,因为绑定程序在设置阶段不知道主题名称。请注意,检测与模式匹配的新主题所花费的时间由使用者属性metadata.max.age.ms控制,该属性(在撰写本文时)默认为 300,000ms(5 分钟)。可以使用上面的configuration属性进行配置。

默认值:false

39.3.3 Kafka Producer 属性

以下属性仅适用于 Kafka 生产者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.作为前缀。

  • admin.configuration

    • 设置新主题时使用的Map Kafka 主题属性,例如spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

Default: none.

  • admin.replicas-assignment

    • 副本分配的 Map<Integer, List>,键为分区,值为分配。在配置新主题时使用。请参阅kafka-clients jar 中的NewTopic javadocs。

Default: none.

  • admin.replication-factor

    • 设置新主题时要使用的复制因子。覆盖 Binder 范围的设置。是否忽略replicas-assignments

默认值:无(使用资料夹范围的默认值 1)。

  • bufferSize

    • Kafka 生产者在发送前尝试分批处理的数据量的上限(以字节为单位)。

默认值:16384

  • sync

    • 生产者是否同步。

默认值:false

  • batchTimeout

    • 生产者在发送消息之前要 await 多长时间才能在同一批中累积更多消息。 (通常,生产者根本不 await,仅发送在上一次发送过程中累积的所有消息.)非零值可能会增加吞吐量,但会增加延迟。

默认值:0

  • messageKeyExpression

    • 根据用于填充产生的 Kafka 消息密钥的传出消息评估 SpEL 表达式,例如headers['myKey']。有效负载无法使用,因为在评估此表达式时,有效负载已经采用byte[]的形式。

默认值:none

  • headerPatterns

    • 以逗号分隔的简单模式列表,以匹配要 Map 到ProducerRecord中的 Kafka Headers的 Spring 消息头。模式可以以通配符(星号)开头或结尾。可以通过以!作为前缀来否定模式。match 在第一个 match(正数或负数)之后停止。例如,!ask,as*将传递ash而不是askidtimestamp从未 Map。

默认值:*(所有 Headers-idtimestamp除外)

  • configuration

    • 使用包含通用 Kafka 生产者属性的键/值对进行 Map。

默认值:空 Map。

Note

KafkaBinder 使用生产者的partitionCount设置作为提示来创建具有给定分区数的主题(与minPartitionCount结合使用,两者中的最大值是所使用的值)。同时为绑定程序配置minPartitionCount和为应用程序配置partitionCount时要谨慎,因为使用了较大的值。如果已经存在一个分区数较小的主题,并且已禁用autoAddPartitions(缺省值),则绑定器无法启动。如果已经存在一个分区数较小的主题,并且已启用autoAddPartitions,则会添加新的分区。如果已经存在的主题的分区数量大于最大值(minPartitionCountpartitionCount),则使用现有分区计数。

39.3.4 用法示例

在本节中,我们将说明针对特定场景使用前面的属性。

示例:将 autoCommitOffset 设置为 false 并依靠手动激活

此示例说明了如何在用户应用程序中手动确认偏移。

此示例要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为false。在您的示例中使用相应的 Importing 通道名称。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:安全配置

Apache Kafka 0.9 支持 Client 端和代理之间的安全连接。要利用此功能,请遵循Apache Kafka 文档和 Kafka 0.9 Confluent 文档中的安全性准则中的准则。使用spring.cloud.stream.kafka.binder.configuration选项可以为 Binder 创建的所有 Client 端设置安全属性。

例如,要将security.protocol设置为SASL_SSL,请设置以下属性:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

可以以类似方式设置所有其他安全属性。

使用 Kerberos 时,请遵循reference documentation中的说明创建和引用 JAAS 配置。

Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。

使用 JAAS 配置文件

可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例显示如何通过使用 JAAS 配置文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream 应用程序:

java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为使用 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置的机制。

以下属性可用于配置 KafkaClient 端的登录上下文:

  • spring.cloud.stream.kafka.binder.jaas.loginModule

    • 登录模块名称。正常情况下无需设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

  • spring.cloud.stream.kafka.binder.jaas.controlFlag

    • 登录模块的控制标志。

默认值:required

  • spring.cloud.stream.kafka.binder.jaas.options

    • 使用包含登录模块选项的键/值对进行 Map。

默认值:空 Map。

以下示例显示如何通过使用 Spring Boot 配置属性使用 SASL 和 Kerberos 启动 Spring Cloud Stream 应用程序:

java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[emailprotected]";
};

如果所需的主题已经存在于代理上或将由 Management 员创建,则可以关闭自动创建,仅需要发送 Client 端 JAAS 属性。

Note

不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果-Djava.security.auth.login.config系统属性已经存在,则 Spring Cloud Stream 将忽略 Spring Boot 属性。

Note

autoCreateTopicsautoAddPartitions与 Kerberos 一起使用时要小心。通常,应用程序可能使用在 Kafka 和 Zookeeper 中没有 Management 权限的主体。因此,依靠 Spring Cloud Stream 创建/修改主题可能会失败。在安全的环境中,我们强烈建议您使用 Kafka 工具创建主题并以 Management 方式 ManagementACL。

示例:暂停和恢复使用方

如果希望暂停使用但不引起分区重新平衡,则可以暂停并恢复使用方。通过将Consumer作为参数添加到@StreamListener中,可以轻松实现这一点。要恢复,您需要为ListenerContainerIdleEvent个实例提供ApplicationListener。事件的发布频率由idleEventInterval属性控制。由于使用者不是线程安全的,因此必须在调用线程上调用这些方法。

以下简单的应用程序显示了如何暂停和恢复:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

39.4 错误通道

从版本 1.3 开始,绑定程序无条件地将异常发送到每个使用者目的地的错误通道,也可以将其配置为将异步生产者发送失败消息发送到错误通道。有关更多信息,请参见第 29.4 节“错误处理”

发送失败的ErrorMessage的有效载荷是具有以下属性的KafkaSendFailureException

  • failedMessage:未能发送的 Spring Messaging Message<?>

  • record:从failedMessage创建的原始ProducerRecord

无法自动处理生产者异常(例如发送到Dead-Letter queue)。您可以通过自己的 Spring Integration 流使用这些异常。

39.5KafkaMetrics

KafkaBinder 模块公开以下 Metrics:

spring.cloud.stream.binder.kafka.offset:此度量标准指示给定的 Consumer 组尚未从给定的 Binder 主题中消费多少消息。提供的 Metrics 基于 MircometerMetrics 库。度量标准包含 Consumer 组信息,主题以及与主题上的最新偏移量有关的承诺偏移量的实际滞后时间。该 Metrics 对于向 PaaS 平台提供自动缩放反馈特别有用。

39.6 死信主题处理

因为您无法预期用户将如何处置死信,所以该框架没有提供任何标准机制来处理它们。如果死信的原因是短暂的,则您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,则可能导致无限循环。本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但是在尝试了三遍之后,它将这些消息移至“停车场”主题。该应用程序是另一个从死信主题中读取的 spring-cloud-stream 应用程序。 5 秒钟未收到任何消息时,它将终止。

这些示例假定原始目的地为so8400out,而使用者组为so8400

有两种策略可供考虑:

  • 考虑仅在主应用程序未运行时才运行重新路由。否则,瞬态错误的重试会很快用完。

  • 或者,使用两阶段方法:使用此应用程序将路由到第三个主题,将另一个应用程序从那里路由回到主主题。

以下代码清单显示了示例应用程序:

application.properties.

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

Application.

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

39.7 使用 KafkaBinder 进行分区

Apache Kafka 本机支持主题分区。

有时,将数据发送到特定的分区是有好处的,例如,当您要严格 Order 消息处理时(特定 Client 的所有消息都应转到同一分区)。

以下示例显示了如何配置生产者和 Consumer 端:

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partitioned: true
            partition-key-expression: headers['partitionKey']
            partition-count: 12

Tip

必须为该主题提供足够的分区,以实现所有 Consumer 组所需的并发性。上面的配置最多支持 12 个使用者实例(如果它们的concurrency为 2,则为 6;如果它们的并发值为 3,则为 4,依此类推)。通常最好“过量配置”分区,以使将来的使用者或并发性增加。

Note

前面的配置使用默认分区(key.hashCode() % partitionCount)。根据键值,这可能会或可能不会提供适当的平衡算法。您可以使用partitionSelectorExpressionpartitionSelectorClass属性覆盖此默认设置。

由于分区是由 Kafka 本地处理的,因此在用户端不需要特殊配置。 Kafka 在实例之间分配分区。

以下 Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。如果实例计数(或instance count * concurrency)超过分区数,则某些使用者处于空闲状态。