39. Apache KafkaBinder

39.1 Usage

要使用 Apache KafkaBinder,您需要将spring-cloud-stream-binder-kafka作为依赖项添加到 Spring Cloud Stream 应用程序中,如以下 Maven 示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

另外,您也可以使用 Spring Cloud Stream Kafka Starter,如以下针对 Maven 的示例所示:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

39.2 Apache Kafka Binder 概述

下图显示了 Apache KafkaBinder 的工作方式的简化图:

图 39.1. Kafka·宾德

Apache Kafka Binder 实现将每个目标 Map 到一个 Apache Kafka 主题。Consumer 组直接 Map 到相同的 Apache Kafka 概念。分区也直接 Map 到 Apache Kafka 分区。

Binder 当前使用 Apache Kafka kafka-clients 1.0.0 jar,并且设计用于至少该版本的代理。该 Client 端可以与较旧的代理进行通信(请参阅 Kafka 文档),但是某些功能可能不可用。例如,对于低于 0.11.x.x 的版本,不支持本机头。此外,0.11.x.x 不支持autoAddPartitions属性。

39.3 配置选项

本节包含 Apache KafkaBinder 使用的配置选项。

有关与 Binder 有关的常见配置选项和属性,请参见core documentation

39.3.1 Kafka Binder 属性

默认值:localhost

默认值:9092

默认值:空 Map。

默认值:空 Map。

Default: empty.

Default: 10.

默认值:1

默认值:1

默认值:空 Map。

默认值:1

Note

此设置与代理的auto.topic.create.enable设置无关,并且不影响它。如果服务器设置为自动创建主题,则可以使用默认代理设置将它们作为元数据检索请求的一部分进行创建。

默认值:true

默认值:false

默认值null(无 Transaction)

默认值:查看单个生产者属性。

Default: none.

39.3.2 KafkaConsumer 属性

以下属性仅适用于 Kafka 使用者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.作为前缀。

Default: none.

Default: none.

默认值:无(使用资料夹范围的默认值 1)。

默认值:true

默认值:false

默认值:true

默认值:未设置。

默认值:false

默认值:null(等于earliest)。

默认值:false

默认值:空 Map。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的主题)。

默认值:默认的 Kafka 生产者属性。

默认值:none

默认值:null

默认值:30000

默认值:false

39.3.3 Kafka Producer 属性

以下属性仅适用于 Kafka 生产者,并且必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.作为前缀。

Default: none.

Default: none.

默认值:无(使用资料夹范围的默认值 1)。

默认值:16384

默认值:false

默认值:0

默认值:none

默认值:*(所有 Headers-idtimestamp除外)

默认值:空 Map。

Note

KafkaBinder 使用生产者的partitionCount设置作为提示来创建具有给定分区数的主题(与minPartitionCount结合使用,两者中的最大值是所使用的值)。同时为绑定程序配置minPartitionCount和为应用程序配置partitionCount时要谨慎,因为使用了较大的值。如果已经存在一个分区数较小的主题,并且已禁用autoAddPartitions(缺省值),则绑定器无法启动。如果已经存在一个分区数较小的主题,并且已启用autoAddPartitions,则会添加新的分区。如果已经存在的主题的分区数量大于最大值(minPartitionCountpartitionCount),则使用现有分区计数。

39.3.4 用法示例

在本节中,我们将说明针对特定场景使用前面的属性。

示例:将 autoCommitOffset 设置为 false 并依靠手动激活

此示例说明了如何在用户应用程序中手动确认偏移。

此示例要求将spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为false。在您的示例中使用相应的 Importing 通道名称。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:安全配置

Apache Kafka 0.9 支持 Client 端和代理之间的安全连接。要利用此功能,请遵循Apache Kafka 文档和 Kafka 0.9 Confluent 文档中的安全性准则中的准则。使用spring.cloud.stream.kafka.binder.configuration选项可以为 Binder 创建的所有 Client 端设置安全属性。

例如,要将security.protocol设置为SASL_SSL,请设置以下属性:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

可以以类似方式设置所有其他安全属性。

使用 Kerberos 时,请遵循reference documentation中的说明创建和引用 JAAS 配置。

Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。

使用 JAAS 配置文件

可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例显示如何通过使用 JAAS 配置文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream 应用程序:

java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为使用 JAAS 配置文件的替代方法,Spring Cloud Stream 提供了一种通过使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置的机制。

以下属性可用于配置 KafkaClient 端的登录上下文:

默认值:com.sun.security.auth.module.Krb5LoginModule

默认值:required

默认值:空 Map。

以下示例显示如何通过使用 Spring Boot 配置属性使用 SASL 和 Kerberos 启动 Spring Cloud Stream 应用程序:

java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[emailprotected]";
};

如果所需的主题已经存在于代理上或将由 Management 员创建,则可以关闭自动创建,仅需要发送 Client 端 JAAS 属性。

Note

不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果-Djava.security.auth.login.config系统属性已经存在,则 Spring Cloud Stream 将忽略 Spring Boot 属性。

Note

autoCreateTopicsautoAddPartitions与 Kerberos 一起使用时要小心。通常,应用程序可能使用在 Kafka 和 Zookeeper 中没有 Management 权限的主体。因此,依靠 Spring Cloud Stream 创建/修改主题可能会失败。在安全的环境中,我们强烈建议您使用 Kafka 工具创建主题并以 Management 方式 ManagementACL。

示例:暂停和恢复使用方

如果希望暂停使用但不引起分区重新平衡,则可以暂停并恢复使用方。通过将Consumer作为参数添加到@StreamListener中,可以轻松实现这一点。要恢复,您需要为ListenerContainerIdleEvent个实例提供ApplicationListener。事件的发布频率由idleEventInterval属性控制。由于使用者不是线程安全的,因此必须在调用线程上调用这些方法。

以下简单的应用程序显示了如何暂停和恢复:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@StreamListener(Sink.INPUT)
	public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
		System.out.println(in);
		consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
	}

	@Bean
	public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
		return event -> {
			System.out.println(event);
			if (event.getConsumer().paused().size() > 0) {
				event.getConsumer().resume(event.getConsumer().paused());
			}
		};
	}

}

39.4 错误通道

从版本 1.3 开始,绑定程序无条件地将异常发送到每个使用者目的地的错误通道,也可以将其配置为将异步生产者发送失败消息发送到错误通道。有关更多信息,请参见第 29.4 节“错误处理”

发送失败的ErrorMessage的有效载荷是具有以下属性的KafkaSendFailureException

无法自动处理生产者异常(例如发送到Dead-Letter queue)。您可以通过自己的 Spring Integration 流使用这些异常。

39.5KafkaMetrics

KafkaBinder 模块公开以下 Metrics:

spring.cloud.stream.binder.kafka.offset:此度量标准指示给定的 Consumer 组尚未从给定的 Binder 主题中消费多少消息。提供的 Metrics 基于 MircometerMetrics 库。度量标准包含 Consumer 组信息,主题以及与主题上的最新偏移量有关的承诺偏移量的实际滞后时间。该 Metrics 对于向 PaaS 平台提供自动缩放反馈特别有用。

39.6 死信主题处理

因为您无法预期用户将如何处置死信,所以该框架没有提供任何标准机制来处理它们。如果死信的原因是短暂的,则您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,则可能导致无限循环。本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但是在尝试了三遍之后,它将这些消息移至“停车场”主题。该应用程序是另一个从死信主题中读取的 spring-cloud-stream 应用程序。 5 秒钟未收到任何消息时,它将终止。

这些示例假定原始目的地为so8400out,而使用者组为so8400

有两种策略可供考虑:

以下代码清单显示了示例应用程序:

application.properties.

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

Application.

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

39.7 使用 KafkaBinder 进行分区

Apache Kafka 本机支持主题分区。

有时,将数据发送到特定的分区是有好处的,例如,当您要严格 Order 消息处理时(特定 Client 的所有消息都应转到同一分区)。

以下示例显示了如何配置生产者和 Consumer 端:

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partitioned: true
            partition-key-expression: headers['partitionKey']
            partition-count: 12

Tip

必须为该主题提供足够的分区,以实现所有 Consumer 组所需的并发性。上面的配置最多支持 12 个使用者实例(如果它们的concurrency为 2,则为 6;如果它们的并发值为 3,则为 4,依此类推)。通常最好“过量配置”分区,以使将来的使用者或并发性增加。

Note

前面的配置使用默认分区(key.hashCode() % partitionCount)。根据键值,这可能会或可能不会提供适当的平衡算法。您可以使用partitionSelectorExpressionpartitionSelectorClass属性覆盖此默认设置。

由于分区是由 Kafka 本地处理的,因此在用户端不需要特殊配置。 Kafka 在实例之间分配分区。

以下 Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息发送到的分区 ID:

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}

application.yml.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。 Kafka 重新平衡分区分配。如果实例计数(或instance count * concurrency)超过分区数,则某些使用者处于空闲状态。

首页