37. Apache Kafka Binder

37.1 用法

要使用 Apache Kafka binder,只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream application:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka Starter。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

37.2 Apache Kafka Binder 概述

有关 Apache Kafka binder 如何操作的简化图表如下所示。

图 1_.Kafka Binder

Apache Kafka Binder implementation maps 将每个目标映射到 Apache Kafka topic。 consumer group maps 直接指向相同的 Apache Kafka 概念。对 Apache Kafka 分区也进行 maps 直接分区。

37.3 Configuration 选项

本节包含 Apache Kafka binder 使用的 configuration 选项。

有关 common configuration 选项和与 binder 相关的 properties,请参阅核心文件

37.3.1 Kafka Binder Properties

默认值:localhost

默认值:9092

默认值:localhost

默认值:2181

默认值:清空 map。

默认值:空。

默认值:10。

默认值:10000

默认值:0

默认值:1

默认值:1

默认值:1

默认值:true

默认值:false

默认值:2097152

37.3.2 Kafka Consumer Properties

以下 properties 仅适用于 Kafka 使用者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.为前缀。

默认值:true

默认值:true

默认值:未设置。

默认值:5000

默认值:null(相当于earliest)。

默认值:false

默认值:清空 map。

默认值:null(如果未指定,则导致错误的消息将转发到名为error.<destination>.<group>的 topic)。

37.3.3 Kafka Producer Properties

以下 properties 仅适用于 Kafka 生成器,必须以spring.cloud.stream.kafka.bindings.<channelName>.producer.为前缀。

默认值:16384

默认值:false

默认值:0

默认值:none

默认值:清空 map。

Kafka binder 将使用 producer 的partitionCount设置作为提示来创建具有给定分区计数的 topic(与minPartitionCount一起使用,两者中的最大值是使用的 value)。在为 binder 配置minPartitionCount和为 application 配置partitionCount时要小心,因为将使用更大的 value。如果 topic 已存在且分区计数较小且autoAddPartitions被禁用(默认值),则 binder 将无法启动。如果 topic 已存在且分区计数较小且启用了autoAddPartitions,则将添加新分区。如果已存在 topic 且分区数大于(minPartitionCountpartitionCount)的最大分区数,则将使用现有分区计数。

37.3.4 用法示例

在本节中,我们将说明上述 properties 在特定场景中的用法。

示例:设置 autoCommitOffset false 并依赖于手动执行。

此 example 说明了如何在 consumer application 中手动确认偏移量。

此 example 要求spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset设置为 false。为 example 使用相应的输入 channel name。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}

示例:security configuration

Apache Kafka 0.9 支持 client 和代理之间的安全连接。要利用此 feature,请遵循Apache Kafka 文档中的指导以及 Kafka 0.9 Confluent 文档中的安全准则。使用spring.cloud.stream.kafka.binder.configuration选项为 binder 创建的所有 clients 设置 security properties。

对于 example,要将security.protocol设置为SASL_SSL,请设置:

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全性 properties 都可以以类似的方式设置。

使用 Kerberos 时,请按照reference 文档中的说明进行 creating 并引用 JAAS configuration。

Spring Cloud Stream 支持使用 JAAS configuration 文件并使用 Spring Boot properties 将 JAAS configuration 信息传递给 application。

使用 JAAS configuration files

可以使用 system properties 为 Spring Cloud Stream applications 设置 JAAS 和(可选)krb5 文件位置。以下是使用 JAAS configuration 文件使用 SASL 和 Kerberos 启动 Spring Cloud Stream application 的示例:

java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot properties

作为拥有 JAAS configuration 文件的替代方法,Spring Cloud Stream 提供了一种机制,用于使用 Spring Boot properties 为 Spring Cloud Stream applications 设置 JAAS configuration。

以下 properties 可用于配置 Kafka client 的 login context。

默认值:com.sun.security.auth.module.Krb5LoginModule

默认值:required

默认值:清空 map。

这是使用 Spring Boot configuration properties 使用 SASL 和 Kerberos 启动 Spring Cloud Stream application 的示例:

java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.[emailprotected]EXAMPLE.COM

这表示以下 JAAS 文件的等效项:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[emailprotected]";
};

如果 broker 上已存在所需主题,或者将由管理员创建,则可以关闭自动创建,并且只需要发送 client JAAS properties。作为设置spring.cloud.stream.kafka.binder.autoCreateTopics的替代方法,您只需从 application 中删除 broker 依赖项即可。有关详细信息,请参阅名为“从基于 binder 的 application 的 classpath 中排除 Kafka broker jar”一节

不要在同一个 application 中混合使用 JAAS configuration files 和 Spring Boot properties。如果-Djava.security.auth.login.config system property 已经存在,Spring Cloud Stream 将忽略 Spring Boot properties。

如果使用 Kerberos,请在使用autoCreateTopicsautoAddPartitions时务必小心。通常 applications 可能使用 Kafka 和 Zookeeper 中没有管理权限的主体,并且依赖 Spring Cloud Stream to create/modify 主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。

将 binder 与 Apache 一起使用 Kafka 0.10

Spring Cloud Stream Kafka binder 中的默认 Kafka 支持适用于 Kafka version 0.10.1.1. binder 还支持连接到其他基于 0.10 的版本和 0.9 clients。在_ to to to this 中,当您创建包含 application 的项目时,请像通常对默认 binder 那样包含spring-cloud-starter-stream-kafka。然后在 pom.xml 文件的<dependencies>部分顶部添加这些依赖项以覆盖依赖项。

这是将 application 降级为 0.10.0.1 的示例。由于它仍然在 0.10 line 上,因此可以保留默认的spring-kafkaspring-integration-kafka版本。

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.10.0.1</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.1</version>
</dependency>

这是使用 0.9.0.1 version 的另一个例子。

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.0.5.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-kafka</artifactId>
  <version>2.0.1.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.11</artifactId>
  <version>0.9.0.1</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
    </exclusion>
  </exclusions>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.9.0.1</version>
</dependency>

上述版本仅为 example 提供。为获得最佳结果,我们建议使用最新的 0.10-compatible 版本的项目。

从基于 binder 的 application 的 classpath 中排除 Kafka broker jar

Apache Kafka Binder 使用管理实用程序作为 Apache Kafka 服务器 library 的一部分来创建和重新配置主题。如果在运行时不需要包含 Apache Kafka 服务器 library 及其依赖项,因为 application 将依赖于以管理方式配置的主题,Kafka binder 允许从 application 中排除 Apache Kafka 服务器依赖项。

如果您按照上面的建议对 Kafka 依赖项使用非默认版本,那么您所要做的就是不要包含 kafka broker 依赖项。如果使用默认的 Kafka version,请确保从spring-cloud-starter-stream-kafka依赖项中排除 kafka broker jar,如下所示。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  <exclusions>
    <exclusion>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
    </exclusion>
  </exclusions>
</dependency>

如果排除 Apache Kafka 服务器依赖项并且服务器上不存在 topic,则 Apache Kafka broker 将在服务器上启用 auto topic 创建时创建 topic。请记住,如果您依赖于此,那么 Kafka 服务器将使用默认的分区数和复制因子。另一方面,如果在服务器上禁用了自动 topic 创建,则必须小心 running application 以创建具有所需分区数的 topic。

如果要完全控制分区的分配方式,请保留默认设置 i.e。不要排除 kafka broker jar 并确保spring.cloud.stream.kafka.binder.autoCreateTopics设置为true,这是默认值。

37.4 Kafka Streams Binding Spring Cloud Stream 的功能

Spring Cloud Stream Kafka 支持还包括专为 Kafka Streams binding 设计的 binder。使用此 binder,可以编写利用 Kafka Streams API 的 applications。有关 Kafka Streams 的更多信息,请参阅Kafka Streams API 开发人员手册

_Spaf 中的 Kafka Streams 支持_Cloud Stream 基于 Spring Kafka 项目提供的基础。有关该支持的详细信息,请参阅Kafaka Streams 支持 Spring Kafka

以下是 Spring Cloud Stream KStream binder artifact 的 maven 坐标。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>

除了利用基于 Spring Boot 的 Spring Cloud Stream 编程 model 之外,KStream binder 提供的主要其他好处之一是它避免了在直接使用 Kafka Streams API 时需要编写的样板配置。通过 Kafka Streams API 提供的高 level 流 DSL 可以在当前支持中通过 Spring Cloud Stream 使用。

37.4.1 用法高级流的 DSL 示例

这个 application 将从 Kafka topic 中侦听,并为 5 秒 time 窗口中看到的每个唯一单词写入单词 count。

@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {

	@StreamListener("input")
	@SendTo("output")
	public KStream<?, String> process(KStream<?, String> input) {
		return input
				.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
				.map((key, word) -> new KeyValue<>(word, word))
				.groupByKey(Serdes.String(), Serdes.String())
				.count(TimeWindows.of(5000), "store-name")
				.toStream()
				.map((w, c) -> new KeyValue<>(null, "Count for " + w.key() + ": " + c));
	}

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}

如果你 build 它 string Boot runnable fat jar,你可以用以下方式运行上面的 example:

java -jar uber.jar  --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts

这意味着 application 将从传入的 Kafka topic 单词中侦听并写入输出 topic 计数。

Spring Cloud Stream 将确保来自传入和传出主题的消息都绑定为 KStream objects。正如人们可能观察到的那样,开发人员可以专注于 code,i.e 的业务方面。编写处理器中所需的逻辑,而不是设置 Kafka Streams 基础结构所需的特定于流的 configuration。所有这些样板都由 Spring Cloud Stream 在幕后处理。

37.4.2 支持交互式查询

如果交互式查询需要访问KafkaStreams,则可以通过KStreamBuilderFactoryBean.getKafkaStreams()访问内部KafkaStreams实例。您可以自动装配 KStream binder 提供的KStreamBuilderFactoryBean实例。然后你可以从中获取KafkaStreams实例并检索底层的 store,对它执行查询等。

37.4.3 Kafka Streams properties

Following are some examples of using this property.
spring.cloud.stream.kstream.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs.

也可以有 binding 特定的 properties。

例如,您可以为输入或输出目的地使用不同的 Serde。

spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
Following is an example of using this property.
Values are provided in milliseconds.
spring.cloud.stream.kstream.timeWindow.length=5000
Following is an example of using this property.
Values are provided in milliseconds.
spring.cloud.stream.kstream.timeWindow.advanceBy=1000

37.5 错误 Channels

从 version 1.3 开始,binder 无条件地将 exceptions 发送到每个 consumer 目标的错误 channel,并且可以配置为将异步 producer 发送失败发送到错误 channel。有关更多信息,请参见名为“Message Channel Binder and Error Channels”的部分

发送失败的ErrorMessage的有效负载是带有 properties 的KafkaSendFailureException

没有自动处理这些 exceptions(例如发送到Dead-Letter 队列);您可以使用自己的 Spring Integration 流来使用这些 exceptions。

37.6 Kafka Metrics

Kafka binder 模块公开以下 metrics:

spring.cloud.stream.binder.kafka.someGroup.someTopic.lag - 此度量标准表示给定 binder group 的给定 binder 的 topic 尚未消耗的消息数量。对于 example,如果度量spring.cloud.stream.binder.kafka.myGroup.myTopic.lag的 value 是1000,那么 consumer group myGroup有等待从 topic myTopic消耗的1000消息。此指标对于向您选择的 PaaS 平台提供 auto-scaling 反馈特别有用。

37.7 Dead-Letter Topic Processing

由于无法预测用户如何处理 dead-lettered 消息,framework 不提供任何标准机制来处理它们。如果 dead-lettering 的原因是暂时的,您可能希望将消息 route 回原始的 topic。但是,如果问题是一个永久性问题,那么可能会导致无限循环。以下spring-boot application 是如何将这些消息重新路由回原始 topic 的示例,但在三次尝试后将它们移动到第三个“停车场”topic。 application 只是从 dead-letter topic 读取的另一个 spring-cloud-stream application。它在 5 秒内没有收到任何消息时终止。

这些示例假设原始目标是so8400out,consumer group 是so8400

有几个注意事项。

application.properties.

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries

应用.

@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, terminating");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}
首页