37. Apache Kafka Binder

37.1 用法

要使用 Apache Kafka binder,您需要将spring-cloud-stream-binder-kafka添加为 Spring Cloud Stream application 的依赖项,如下面 Maven 的 example 所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,你也可以使用 Spring Cloud Stream Kafka Starter,如下面 Maven 的 example 所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

37.2 Apache Kafka Binder 概述

下图显示了 Apache Kafka binder 如何操作的简化图:

图 1_.Kafka Binder

kafka binder

Apache Kafka Binder implementation maps 将每个目标映射到 Apache Kafka topic。 consumer group maps 直接指向相同的 Apache Kafka 概念。对 Apache Kafka 分区也进行 maps 直接分区。

binder 当前使用 Apache Kafka kafka-clients 1.0.0 jar,旨在与至少 version 的 broker 一起使用。此 client 可以与较旧的代理进行通信(请参阅 Kafka 文档),但某些 features 可能不可用。例如,对于早于 0.11.x.x 的版本,不支持本机 headers。此外,0.11.x.x 不支持autoAddPartitions property。

37.3 Configuration 选项

本节包含 Apache Kafka binder 使用的 configuration 选项。

有关 common configuration 选项和与 binder 相关的 properties,请参阅核心文件

37.3.1 Kafka Binder Properties

  • spring.cloud.stream.kafka.binder.brokers

    • Kafka binder 连接的代理列表。

默认值:localhost

  • spring.cloud.stream.kafka.binder.defaultBrokerPort

    • brokers允许使用或不使用 port 信息指定的主机(对于 example,host1,host2:port2)。当 broker 列表中没有配置 port 时,此_set 设置默认 port。

默认值:9092

  • spring.cloud.stream.kafka.binder.configuration

    • client properties(包括 producer 和 consumer)的 Key/Value map 传递给 binder 创建的所有 clients。由于生产者和消费者都使用这些 properties 这一事实,因此应将使用限制在 common properties - 例如,安全设置。

默认值:清空 map。

  • spring.cloud.stream.kafka.binder.headers

    • binder 传输的自定义_header 列表。仅在与旧 applications(⇐1.3.x)与kafka-clients version <0.11.0.0 进行通信时才需要。较新的版本本身支持 headers。

默认值:空。

  • spring.cloud.stream.kafka.binder.healthTimeout

    • time 等待获取分区信息,以秒为单位。如果此计时器到期,健康状况将报告为关闭。

默认值:10。

  • spring.cloud.stream.kafka.binder.requiredAcks

    • broker 上所需的 ack 数。有关 producer acks property 的信息,请参阅 Kafka 文档。

默认值:1

  • spring.cloud.stream.kafka.binder.minPartitionCount

    • 仅在设置了autoCreateTopicsautoAddPartitions时有效。 binder 在其生成或使用数据的主题上配置的 global 最小分区数。它可以被 producer 的partitionCount设置或 producer 的instanceCount * concurrency设置的 value 取代(如果其中任何一个更大)。

默认值:1

  • spring.cloud.stream.kafka.binder.replicationFactor

    • 如果autoCreateTopics是 active,则 auto-created 主题的复制因子。可以在每个 binding 上重写。

默认值:1

  • spring.cloud.stream.kafka.binder.autoCreateTopics

    • 如果设置为true,则 binder 会自动创建新主题。如果设置为false,binder 将依赖于已配置的主题。在后一种情况下,如果主题不存在,则 binder 无法启动。

此设置独立于 broker 的auto.topic.create.enable设置,不会影响它。如果服务器设置为 auto-create 主题,则可以使用默认的 broker 设置将它们创建为元数据检索请求的一部分。

默认值:true

  • spring.cloud.stream.kafka.binder.autoAddPartitions

    • 如果设置为true,则 binder 会根据需要创建新分区。如果设置为false,binder 将依赖于已配置的 topic 的分区大小。如果目标 topic 的分区计数小于预期的 value,则 binder 无法启动。

默认值:false

  • spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

    • 在 binder 中启用 transactions。请参阅 Kafka 文档中的transaction.idspring-kafka文档中的Transactions。启用 transactions 时,将忽略单个producer properties,并且所有生成器都使用spring.cloud.stream.kafka.binder.transaction.producer.* properties。

默认null(没有 transactions)

  • spring.cloud.stream.kafka.binder.transaction.producer.*

    • _ global producer properties for transactional binder 中的生产者。请参阅spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix第 37.3.3 节,“ Kafka Producer Properties”以及所有 binders 支持的常规 producer properties。

默认值:查看单个 producer properties。

  • spring.cloud.stream.kafka.binder.headerMapperBeanName

    • KafkaHeaderMapper的 bean name 用于将spring-messaging headers 与 Kafka headers 进行映射。例如,如果您希望自定义DefaultKafkaHeaderMapper中对 headers 使用 JSON 反序列化的受信任包,请使用此选项。

默认值:none。

37.3.2 Kafka Consumer Properties

以下 properties 仅适用于 Kafka 使用者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

  • admin.configuration

    • 配置主题时使用的Kafka topic properties - 例如,spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

默认值:none。

  • admin.replicas-assignment

    • 副本分配的 Map <Integer, List<Integer>>,其中 key 是分区,value 是赋值。在配置新主题时使用。请参阅kafka-clients jar 中的NewTopic Javadoc。

默认值:none。

  • admin.replication-factor

    • 配置主题时要使用的复制因子。覆盖 binder-wide 设置。如果replicas-assignments存在则忽略。

默认值:none(使用 binder-wide 默认值 1)。

  • autoRebalanceEnabled

    • true时, topic 分区会在 consumer group 的成员之间自动重新平衡。当false时,每个 consumer 都会根据spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex分配一组固定的分区。这需要在每个已启动的实例上正确设置spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex properties。在这种情况下,spring.cloud.stream.instanceCount property 的 value 通常必须大于 1。

默认值:true

  • ackEachRecord

    • autoCommitOffsettrue时,此设置指示在处理每个 record 后是否提交偏移量。默认情况下,在处理consumer.poll()返回的一批记录中的所有记录后,将提交偏移。可以使用max.poll.records Kafka property 控制轮询返回的记录数,该max.poll.records 将此设置为true可能会导致 performance 降级,但这样做会降低发生故障时重新传送记录的可能性。另外,请参阅 binder requiredAcks property,它也会影响提交偏移的 performance。

默认值:false

  • autoCommitOffset

    • 是否在处理消息时自动提交偏移量。如果设置为false,则入站消息中将出现带有org.springframework.kafka.support.Acknowledgment标头类型 key kafka_acknowledgment的标头。 Applications 可以使用此标头来确认消息。有关详细信息,请参阅示例部分当此 property 设置为false时,Kafka binder 将 ack 模式设置为org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,application 负责确认记录。另见ackEachRecord

默认值:true

  • autoCommitOnError

    • 仅在autoCommitOffset设置为true时有效。如果设置为false,则会对导致错误的消息禁止 auto-commits,并仅为成功的消息提交。它允许流在上次成功处理的消息中自动重放,以防出现持续故障。如果设置为true,则始终为 auto-commits(如果启用了 auto-commit)。如果未设置(默认值),则它实际上具有与enableDlq,auto-committing 错误消息相同的 value,如果它们被发送到 DLQ 而不提交它们。

默认值:未设置。

  • resetOffsets

    • 是否将 consumer 上的偏移重置为 startOffset 提供的 value。

默认值:false

  • 开始偏移

    • 新组的起始偏移量。允许的值:earliestlatest。如果为 consumer'binding'(通过spring.cloud.stream.bindings.<channelName>.group)显式设置 consumer group,则'startOffset'设置为earliest。否则,anonymous consumer group 的设置为latest。另见resetOffsets(此列表的前面部分)。

默认值:null(相当于earliest)。

  • enableDlq

    • 设置为 true 时,它会为 consumer 启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为error.<destination>.<group>的 topic。可以通过设置dlqName property 来配置 DLQ topic name。对于错误数量相对较小并且重放整个原始 topic 的情况可能过于繁琐的情况,这为更 common Kafka 重放场景提供了备选选项。有关更多信息,请参见第 37.6 节,“Dead-Letter Topic 处理”处理。从 version 2.0 开始,使用以下 headers 增强发送到 DLQ topic 的消息:x-original-topicx-exception-messagex-exception-stacktracebyte[]

默认值:false

  • 组态

    • 带有 key/value 对的 Map 包含泛型 Kafka consumer properties。

默认值:清空 map。

  • dlqName

    • 用于接收错误消息的 DLQ topic 的 name。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的 topic)。

  • dlqProducerProperties

    • 使用它,可以设置 DLQ-specific producer properties。通过 kafka producer properties 提供的所有 properties 都可以通过此 property 设置。

默认值:默认 Kafka producer properties。

  • standardHeaders

    • 指示入站 channel 适配器填充的标准_header。允许的值:noneidtimestampboth。如果使用本机反序列化并且第一个 component 接收消息需要id(例如配置为使用 JDBC 消息 store 的聚合器),则非常有用。

默认值:none

  • converterBeanName

    • 实现RecordMessageConverter的 bean 的 name。在入站 channel 适配器中用于替换默认的MessagingMessageConverter

默认值:null

默认值:30000

37.3.3 Kafka Producer Properties

以下 properties 仅适用于 Kafka 生成器,必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.为前缀。

  • admin.configuration

    • 在配置新主题时使用的Kafka topic properties - 例如,spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0

默认值:none。

  • admin.replicas-assignment

    • 副本分配的 Map <Integer, List<Integer>>,其中 key 是分区,value 是赋值。在配置新主题时使用。请参阅kafka-clients jar 中的NewTopic javadocs。

默认值:none。

  • admin.replication-factor

    • 配置新主题时要使用的复制因子。覆盖 binder-wide 设置。如果replicas-assignments存在则忽略。

默认值:none(使用 binder-wide 默认值 1)。

  • 缓冲区大小

    • Kafka producer 在发送之前尝试批量处理的数据的上限(以字节为单位)。

默认值:16384

  • 同步

    • producer 是否同步。

默认值:false

  • batchTimeout

    • 在发送消息之前,producer 如何等待允许更多消息在同一批次中累积。 (通常情况下,producer 根本不会等待,只是发送上一次发送在 progress.)中时累积的所有消息。一个 non-zero value 可能会以延迟为代价来增加吞吐量。

默认值:0

  • messageKeyExpression

    • 针对用于填充生成的 Kafka 消息的 key 的传出消息评估的 SpEL 表达式 - 对于 example,headers['myKey']。无法使用有效负载,因为通过 time 评估此表达式,有效负载已经是byte[]的形式。

默认值:none

  • headerPatterns

    • 一个 comma-delimited 简单模式列表 match Spring messaging headers 被映射到ProducerRecord中的 Kafka Headers。模式可以以通配符(星号)开头或结尾。可以通过前缀!来取消模式。匹配在第一个 match(正面或负面)之后停止。对于 example !ask,as*将传递ash但不传递ask。永远不会映射idtimestamp

默认值:*(所有 headers - 除了idtimestamp)

  • 组态

    • 带有 key/value 对的 Map 包含通用 Kafka producer properties。

默认值:清空 map。

Kafka binder 使用 producer 的partitionCount设置作为提示来创建具有给定分区计数的 topic(与minPartitionCount一起使用,两者的最大值是使用的 value)。在为 binder 配置minPartitionCount和为 application 配置partitionCount时要小心,因为使用了较大的 value。如果 topic 已存在且分区计数较小且autoAddPartitions已禁用(默认值),则 binder 无法启动。如果已经存在 topic 且分区计数较小且启用了autoAddPartitions,则会添加新分区。如果已存在 topic,其分区数大于(minPartitionCountpartitionCount)的最大值,则使用现有分区计数。

37.3.4 用法示例

在本节中,我们将展示前面的 properties 在特定场景中的用法。

Example:将 autoCommitOffset 设置为 false 并依赖 Manual Acking

此 example 说明了如何在 consumer application 中手动确认偏移量。

此 example 要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为false。为 example 使用相应的输入 channel name。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

Example:安全 Configuration

Apache Kafka 0.9 支持 client 和代理之间的安全连接。要利用此 feature,请遵循Apache Kafka 文档中的指导以及 Kafka 0.9 Confluent 文档中的安全准则。使用spring.cloud.stream.kafka.binder.configuration选项为 binder 创建的所有 clients 设置 security properties。

对于 example,要将security.protocol设置为SASL_SSL,请设置以下 property:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全性 properties 都可以以类似的方式设置。

使用 Kerberos 时,请按照reference 文档中的说明进行 creating 并引用 JAAS configuration。

Spring Cloud Stream 支持使用 JAAS configuration 文件并使用 Spring Boot properties 将 JAAS configuration 信息传递给 application。

使用 JAAS Configuration Files

可以使用 system properties 为 Spring Cloud Stream applications 设置 JAAS 和(可选)krb5 文件位置。以下 example 显示了如何使用 JAAS configuration 文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream application:

java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot Properties

作为拥有 JAAS configuration 文件的替代方法,Spring Cloud Stream 提供了一种机制,通过使用 Spring Boot properties 为 Spring Cloud Stream applications 设置 JAAS configuration。

以下 properties 可用于配置 Kafka client 的 login context:

  • spring.cloud.stream.kafka.binder.jaas.loginModule

    • 登录模块 name。在正常情况下无需设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

  • spring.cloud.stream.kafka.binder.jaas.controlFlag

    • 登录模块的控制 flag。

默认值:required

  • spring.cloud.stream.kafka.binder.jaas.options

    • 带有 key/value 对的 Map 包含登录模块选项。

默认值:清空 map。

以下 example 显示了如何使用 Spring Boot configuration properties 使用 SASL 和 Kerberos 启动 Spring Cloud Stream application:

java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的 example 表示以下 JAAS 文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[emailprotected]";
};

如果 broker 上已存在所需主题或将由管理员创建,则可以关闭自动创建,并且只需要发送 client JAAS properties。

不要在同一个 application 中混合使用 JAAS configuration files 和 Spring Boot properties。如果-Djava.security.auth.login.config system property 已存在,Spring Cloud Stream 将忽略 Spring Boot properties。

使用带有 Kerberos 的autoCreateTopicsautoAddPartitions时要小心。通常,applications 可能使用 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 到 create/modify 主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。

示例:暂停和恢复 Consumer

如果您希望暂停使用但不会导致分区重新平衡,则可以暂停并恢复 consumer。通过将Consumer作为参数添加到@StreamListener可以实现这一点。要恢复,您需要ApplicationListener for ListenerContainerIdleEvent实例。发布 events 的频率由idleEventInterval property 控制。由于 consumer 不是 thread-safe,因此必须在调用线程上调用这些方法。

以下简单的 application 显示了如何暂停和恢复:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

37.4 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且还可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见第 27.4 节,“错误处理”

发送失败的ErrorMessage的有效负载是带有 properties 的KafkaSendFailureException

  • failedMessage:无法发送的 Spring Messaging Message<?>

  • record:从failedMessage创建的原始ProducerRecord

没有自动处理 producer exceptions(例如发送到Dead-Letter 队列)。您可以使用自己的 Spring Integration 流使用这些 exceptions。

37.5 Kafka Metrics

Kafka binder 模块公开以下 metrics:

spring.cloud.stream.binder.kafka.someGroup.someTopic.lag:此度量标准指示给定 consumer group 从给定 binder 的 topic 中尚未消耗的消息数。例如,如果度量标准spring.cloud.stream.binder.kafka.myGroup.myTopic.lag的 value 是1000,则名为myGroup的 consumer group 有1000消息等待从 topic calle myTopic消耗。此指标对于向 PaaS 平台提供 auto-scaling 反馈特别有用。

37.6 Dead-Letter Topic Processing

因为您无法预测用户将如何处置 dead-lettered 消息,所以 framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始的 topic。但是,如果问题是一个永久性问题,那么可能会导致无限循环。此 topic 中的 sample Spring Boot application 是如何将这些消息重新路由回原始 topic 的示例,但是在三次尝试后它们将它们移动到“停车场”topic。 application 是从 dead-letter topic 读取的另一个 spring-cloud-stream application。它在 5 秒内没有收到任何消息时终止。

这些示例假设原始目标是so8400out,consumer group 是so8400

有几种策略需要考虑:

  • 考虑仅在主 application 不是 running 时才运行重新路由。否则,瞬态错误的重试会很快耗尽。

  • 或者,使用 two-stage 方法:使用此 application 来路由到第三个 topic,另一个 route 从那里返回到主 topic。

以下 code 列表显示 sample application:

application.properties.

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

应用.

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

37.7 使用 Kafka Binder 分区

Apache Kafka 原生支持 topic 分区。

有时将数据发送到特定分区是有利的 - 例如,当您想要严格 order 消息处理时(特定客户的所有消息都应该转到同一分区)。

以下 example 显示了如何配置 producer 和 consumer 端:

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partitioned: true
            partition-key-expression: headers['partitionKey']
            partition-count: 12

必须配置 topic 以具有足够的分区以实现所有 consumer 组的所需并发性。上面的 configuration 最多支持 12 个 consumer 实例(如果它们的concurrency是 2 则为 6,如果它们的并发性为 3 则为 4,依此类推)。通常最好“over-provision”分区以允许将来增加消费者或并发性。

前面的 configuration 使用默认分区(key.hashCode() % partitionCount)。根据 key 值,这可能会也可能不会提供适当平衡的算法。您可以使用partitionSelectorExpressionpartitionSelectorClass properties 覆盖此默认值。

由于分区由 Kafka 本地处理,因此 consumer 端不需要特殊的 configuration。 Kafka 在实例之间分配分区。

以下 Spring Boot application 侦听 Kafka 流并打印(到 console)每条消息所针对的分区 ID:

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。如果实例计数(或instance count * concurrency)超过分区数,则某些使用者是 idle。