37. Apache Kafka Binder

37.1 用法

要使用 Apache Kafka binder,您需要将spring-cloud-stream-binder-kafka添加为 Spring Cloud Stream application 的依赖项,如下面 Maven 的 example 所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,你也可以使用 Spring Cloud Stream Kafka Starter,如下面 Maven 的 example 所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

37.2 Apache Kafka Binder 概述

下图显示了 Apache Kafka binder 如何操作的简化图:

图 1_.Kafka Binder

Apache Kafka Binder implementation maps 将每个目标映射到 Apache Kafka topic。 consumer group maps 直接指向相同的 Apache Kafka 概念。对 Apache Kafka 分区也进行 maps 直接分区。

binder 当前使用 Apache Kafka kafka-clients 1.0.0 jar,旨在与至少 version 的 broker 一起使用。此 client 可以与较旧的代理进行通信(请参阅 Kafka 文档),但某些 features 可能不可用。例如,对于早于 0.11.x.x 的版本,不支持本机 headers。此外,0.11.x.x 不支持autoAddPartitions property。

37.3 Configuration 选项

本节包含 Apache Kafka binder 使用的 configuration 选项。

有关 common configuration 选项和与 binder 相关的 properties,请参阅核心文件

37.3.1 Kafka Binder Properties

默认值:localhost

默认值:9092

默认值:清空 map。

默认值:空。

默认值:10。

默认值:1

默认值:1

默认值:1

此设置独立于 broker 的auto.topic.create.enable设置,不会影响它。如果服务器设置为 auto-create 主题,则可以使用默认的 broker 设置将它们创建为元数据检索请求的一部分。

默认值:true

默认值:false

默认null(没有 transactions)

默认值:查看单个 producer properties。

默认值:none。

37.3.2 Kafka Consumer Properties

以下 properties 仅适用于 Kafka 使用者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

默认值:none。

默认值:none。

默认值:none(使用 binder-wide 默认值 1)。

默认值:true

默认值:false

默认值:true

默认值:未设置。

默认值:false

默认值:null(相当于earliest)。

默认值:false

默认值:清空 map。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的 topic)。

默认值:默认 Kafka producer properties。

默认值:none

默认值:null

默认值:30000

37.3.3 Kafka Producer Properties

以下 properties 仅适用于 Kafka 生成器,必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.为前缀。

默认值:none。

默认值:none。

默认值:none(使用 binder-wide 默认值 1)。

默认值:16384

默认值:false

默认值:0

默认值:none

默认值:*(所有 headers - 除了idtimestamp)

默认值:清空 map。

Kafka binder 使用 producer 的partitionCount设置作为提示来创建具有给定分区计数的 topic(与minPartitionCount一起使用,两者的最大值是使用的 value)。在为 binder 配置minPartitionCount和为 application 配置partitionCount时要小心,因为使用了较大的 value。如果 topic 已存在且分区计数较小且autoAddPartitions已禁用(默认值),则 binder 无法启动。如果已经存在 topic 且分区计数较小且启用了autoAddPartitions,则会添加新分区。如果已存在 topic,其分区数大于(minPartitionCountpartitionCount)的最大值,则使用现有分区计数。

37.3.4 用法示例

在本节中,我们将展示前面的 properties 在特定场景中的用法。

Example:将 autoCommitOffset 设置为 false 并依赖 Manual Acking

此 example 说明了如何在 consumer application 中手动确认偏移量。

此 example 要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为false。为 example 使用相应的输入 channel name。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

Example:安全 Configuration

Apache Kafka 0.9 支持 client 和代理之间的安全连接。要利用此 feature,请遵循Apache Kafka 文档中的指导以及 Kafka 0.9 Confluent 文档中的安全准则。使用spring.cloud.stream.kafka.binder.configuration选项为 binder 创建的所有 clients 设置 security properties。

对于 example,要将security.protocol设置为SASL_SSL,请设置以下 property:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全性 properties 都可以以类似的方式设置。

使用 Kerberos 时,请按照reference 文档中的说明进行 creating 并引用 JAAS configuration。

Spring Cloud Stream 支持使用 JAAS configuration 文件并使用 Spring Boot properties 将 JAAS configuration 信息传递给 application。

使用 JAAS Configuration Files

可以使用 system properties 为 Spring Cloud Stream applications 设置 JAAS 和(可选)krb5 文件位置。以下 example 显示了如何使用 JAAS configuration 文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream application:

java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot Properties

作为拥有 JAAS configuration 文件的替代方法,Spring Cloud Stream 提供了一种机制,通过使用 Spring Boot properties 为 Spring Cloud Stream applications 设置 JAAS configuration。

以下 properties 可用于配置 Kafka client 的 login context:

默认值:com.sun.security.auth.module.Krb5LoginModule

默认值:required

默认值:清空 map。

以下 example 显示了如何使用 Spring Boot configuration properties 使用 SASL 和 Kerberos 启动 Spring Cloud Stream application:

java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的 example 表示以下 JAAS 文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[emailprotected]";
};

如果 broker 上已存在所需主题或将由管理员创建,则可以关闭自动创建,并且只需要发送 client JAAS properties。

不要在同一个 application 中混合使用 JAAS configuration files 和 Spring Boot properties。如果-Djava.security.auth.login.config system property 已存在,Spring Cloud Stream 将忽略 Spring Boot properties。

使用带有 Kerberos 的autoCreateTopicsautoAddPartitions时要小心。通常,applications 可能使用 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 到 create/modify 主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。

示例:暂停和恢复 Consumer

如果您希望暂停使用但不会导致分区重新平衡,则可以暂停并恢复 consumer。通过将Consumer作为参数添加到@StreamListener可以实现这一点。要恢复,您需要ApplicationListener for ListenerContainerIdleEvent实例。发布 events 的频率由idleEventInterval property 控制。由于 consumer 不是 thread-safe,因此必须在调用线程上调用这些方法。

以下简单的 application 显示了如何暂停和恢复:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

37.4 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且还可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见第 27.4 节,“错误处理”

发送失败的ErrorMessage的有效负载是带有 properties 的KafkaSendFailureException

没有自动处理 producer exceptions(例如发送到Dead-Letter 队列)。您可以使用自己的 Spring Integration 流使用这些 exceptions。

37.5 Kafka Metrics

Kafka binder 模块公开以下 metrics:

spring.cloud.stream.binder.kafka.someGroup.someTopic.lag:此度量标准指示给定 consumer group 从给定 binder 的 topic 中尚未消耗的消息数。例如,如果度量标准spring.cloud.stream.binder.kafka.myGroup.myTopic.lag的 value 是1000,则名为myGroup的 consumer group 有1000消息等待从 topic calle myTopic消耗。此指标对于向 PaaS 平台提供 auto-scaling 反馈特别有用。

37.6 Dead-Letter Topic Processing

因为您无法预测用户将如何处置 dead-lettered 消息,所以 framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始的 topic。但是,如果问题是一个永久性问题,那么可能会导致无限循环。此 topic 中的 sample Spring Boot application 是如何将这些消息重新路由回原始 topic 的示例,但是在三次尝试后它们将它们移动到“停车场”topic。 application 是从 dead-letter topic 读取的另一个 spring-cloud-stream application。它在 5 秒内没有收到任何消息时终止。

这些示例假设原始目标是so8400out,consumer group 是so8400

有几种策略需要考虑:

以下 code 列表显示 sample application:

application.properties.

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

应用.

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

37.7 使用 Kafka Binder 分区

Apache Kafka 原生支持 topic 分区。

有时将数据发送到特定分区是有利的 - 例如,当您想要严格 order 消息处理时(特定客户的所有消息都应该转到同一分区)。

以下 example 显示了如何配置 producer 和 consumer 端:

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partitioned: true
            partition-key-expression: headers['partitionKey']
            partition-count: 12

必须配置 topic 以具有足够的分区以实现所有 consumer 组的所需并发性。上面的 configuration 最多支持 12 个 consumer 实例(如果它们的concurrency是 2 则为 6,如果它们的并发性为 3 则为 4,依此类推)。通常最好“over-provision”分区以允许将来增加消费者或并发性。

前面的 configuration 使用默认分区(key.hashCode() % partitionCount)。根据 key 值,这可能会也可能不会提供适当平衡的算法。您可以使用partitionSelectorExpressionpartitionSelectorClass properties 覆盖此默认值。

由于分区由 Kafka 本地处理,因此 consumer 端不需要特殊的 configuration。 Kafka 在实例之间分配分区。

以下 Spring Boot application 侦听 Kafka 流并打印(到 console)每条消息所针对的分区 ID:

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。如果实例计数(或instance count * concurrency)超过分区数,则某些使用者是 idle。

首页