37. Apache Kafka Binder
37.1 Usage
For using the Apache Kafka binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
Alternatively, you can also use the Spring Cloud Stream Kafka Starter.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
37.2 Apache Kafka Binder Overview
A simplified diagram of how the Apache Kafka binder operates can be seen below.
Figure 37.1. Kafka Binder
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
37.3 Configuration Options
This section contains the configuration options used by the Apache Kafka binder.
For common configuration options and properties pertaining to binder, refer to the core documentation.
37.3.1 Kafka Binder Properties
-
spring.cloud.stream.kafka.binder.brokers
- A list of brokers to which the Kafka binder will connect.
Default: localhost
.
-
spring.cloud.stream.kafka.binder.defaultBrokerPort
brokers
allows hosts specified with or without port information (e.g.,host1,host2:port2
). This sets the default port when no port is configured in the broker list.
Default: 9092
.
-
spring.cloud.stream.kafka.binder.zkNodes
- A list of ZooKeeper nodes to which the Kafka binder can connect.
Default: localhost
.
-
spring.cloud.stream.kafka.binder.defaultZkPort
zkNodes
allows hosts specified with or without port information (e.g.,host1,host2:port2
). This sets the default port when no port is configured in the node list.
Default: 2181
.
-
spring.cloud.stream.kafka.binder.configuration
- Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder. Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, especially security settings.
Default: Empty map.
-
spring.cloud.stream.kafka.binder.headers
- The list of custom headers that will be transported by the binder.
Default: empty.
-
spring.cloud.stream.kafka.binder.healthTimeout
- The time to wait to get partition information in seconds; default 60. Health will report as down if this timer expires.
Default: 10.
-
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow
- The frequency, in milliseconds, with which offsets are saved. Ignored if
0
.
- The frequency, in milliseconds, with which offsets are saved. Ignored if
Default: 10000
.
-
spring.cloud.stream.kafka.binder.offsetUpdateCount
- The frequency, in number of updates, which which consumed offsets are persisted. Ignored if
0
. Mutually exclusive withoffsetUpdateTimeWindow
.
- The frequency, in number of updates, which which consumed offsets are persisted. Ignored if
Default: 0
.
-
spring.cloud.stream.kafka.binder.requiredAcks
- The number of required acks on the broker.
Default: 1
.
-
spring.cloud.stream.kafka.binder.minPartitionCount
- Effective only if
autoCreateTopics
orautoAddPartitions
is set. The global minimum number of partitions that the binder will configure on topics on which it produces/consumes data. It can be superseded by thepartitionCount
setting of the producer or by the value ofinstanceCount
*concurrency
settings of the producer (if either is larger).
- Effective only if
Default: 1
.
-
spring.cloud.stream.kafka.binder.replicationFactor
- The replication factor of auto-created topics if
autoCreateTopics
is active.
- The replication factor of auto-created topics if
Default: 1
.
-
spring.cloud.stream.kafka.binder.autoCreateTopics
- If set to
true
, the binder will create new topics automatically. If set tofalse
, the binder will rely on the topics being already configured. In the latter case, if the topics do not exist, the binder will fail to start. Of note, this setting is independent of theauto.topic.create.enable
setting of the broker and it does not influence it: if the server is set to auto-create topics, they may be created as part of the metadata retrieval request, with default broker settings.
- If set to
Default: true
.
-
spring.cloud.stream.kafka.binder.autoAddPartitions
- If set to
true
, the binder will create add new partitions if required. If set tofalse
, the binder will rely on the partition size of the topic being already configured. If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
- If set to
Default: false
.
-
spring.cloud.stream.kafka.binder.socketBufferSize
- Size (in bytes) of the socket buffer to be used by the Kafka consumers.
Default: 2097152
.
37.3.2 Kafka Consumer Properties
The following properties are available for Kafka consumers only and must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.consumer.
.
-
autoRebalanceEnabled
- When
true
, topic partitions will be automatically rebalanced between the members of a consumer group. Whenfalse
, each consumer will be assigned a fixed set of partitions based onspring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
. This requires bothspring.cloud.stream.instanceCount
andspring.cloud.stream.instanceIndex
properties to be set appropriately on each launched instance. The propertyspring.cloud.stream.instanceCount
must typically be greater than 1 in this case.
- When
Default: true
.
-
autoCommitOffset
- Whether to autocommit offsets when a message has been processed. If set to
false
, a header with the keykafka_acknowledgment
of the typeorg.springframework.kafka.support.Acknowledgment
header will be present in the inbound message. Applications may use this header for acknowledging messages. See the examples section for details. When this property is set tofalse
, Kafka binder will set the ack mode toorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
.
- Whether to autocommit offsets when a message has been processed. If set to
Default: true
.
-
autoCommitOnError
- Effective only if
autoCommitOffset
is set totrue
. If set tofalse
it suppresses auto-commits for messages that result in errors, and will commit only for successful messages, allows a stream to automatically replay from the last successfully processed message, in case of persistent failures. If set totrue
, it will always auto-commit (if auto-commit is enabled). If not set (default), it effectively has the same value asenableDlq
, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
- Effective only if
Default: not set.
-
recoveryInterval
- The interval between connection recovery attempts, in milliseconds.
Default: 5000
.
-
startOffset
- The starting offset for new groups. Allowed values:
earliest
,latest
. If the consumer group is set explicitly for the consumer 'binding' (viaspring.cloud.stream.bindings.<channelName>.group
), then 'startOffset' is set toearliest
; otherwise it is set tolatest
for theanonymous
consumer group.
- The starting offset for new groups. Allowed values:
Default: null (equivalent to earliest
).
-
enableDlq
- When set to true, it will send enable DLQ behavior for the consumer. By default, messages that result in errors will be forwarded to a topic named
error.<destination>.<group>
. The DLQ topic name can be configurable via the propertydlqName
. This provides an alternative option to the more common Kafka replay scenario for the case when the number of errors is relatively small and replaying the entire original topic may be too cumbersome.
- When set to true, it will send enable DLQ behavior for the consumer. By default, messages that result in errors will be forwarded to a topic named
Default: false
.
-
configuration
- Map with a key/value pair containing generic Kafka consumer properties.
Default: Empty map.
-
dlqName
- The name of the DLQ topic to receive the error messages.
Default: null (If not specified, messages that result in errors will be forwarded to a topic named error.<destination>.<group>
).
37.3.3 Kafka Producer Properties
The following properties are available for Kafka producers only and must be prefixed with spring.cloud.stream.kafka.bindings.<channelName>.producer.
.
-
bufferSize
- Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
Default: 16384
.
-
sync
- Whether the producer is synchronous.
Default: false
.
-
batchTimeout
- How long the producer will wait before sending in order to allow more messages to accumulate in the same batch. (Normally the producer does not wait at all, and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
Default: 0
.
-
messageKeyExpression
- A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message. For example
headers.key
orpayload.myKey
.
- A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message. For example
Default: none
.
-
configuration
- Map with a key/value pair containing generic Kafka producer properties.
Default: Empty map.
The Kafka binder will use the
partitionCount
setting of the producer as a hint to create a topic with the given partition count (in conjunction with theminPartitionCount
, the maximum of the two being the value being used). Exercise caution when configuring bothminPartitionCount
for a binder andpartitionCount
for an application, as the larger value will be used. If a topic already exists with a smaller partition count andautoAddPartitions
is disabled (the default), then the binder will fail to start. If a topic already exists with a smaller partition count andautoAddPartitions
is enabled, new partitions will be added. If a topic already exists with a larger number of partitions than the maximum of (minPartitionCount
andpartitionCount
), the existing partition count will be used.
37.3.4 Usage examples
In this section, we illustrate the use of the above properties for specific scenarios.
Example: Setting autoCommitOffset false and relying on manual acking.
This example illustrates how one may manually acknowledge offsets in a consumer application.
This example requires that spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset
is set to false. Use the corresponding input channel name for your example.
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
Example: security configuration
Apache Kafka 0.9 supports secure connections between client and brokers. To take advantage of this feature, follow the guidelines in the Apache Kafka Documentation as well as the Kafka 0.9 security guidelines from the Confluent documentation . Use the spring.cloud.stream.kafka.binder.configuration
option to set security properties for all clients created by the binder.
For example, for setting security.protocol
to SASL_SSL
, set:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
All the other security properties can be set in a similar manner.
When using Kerberos, follow the instructions in the reference documentation for creating and referencing the JAAS configuration.
Spring Cloud Stream supports passing JAAS configuration information to the application using a JAAS configuration file and using Spring Boot properties.
Using JAAS configuration files
The JAAS, and (optionally) krb5 file locations can be set for Spring Cloud Stream applications by using system properties. Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos using a JAAS configuration file:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
Using Spring Boot properties
As an alternative to having a JAAS configuration file, Spring Cloud Stream provides a mechanism for setting up the JAAS configuration for Spring Cloud Stream applications using Spring Boot properties.
The following properties can be used for configuring the login context of the Kafka client.
-
spring.cloud.stream.kafka.binder.jaas.loginModule
- The login module name. Not necessary to be set in normal cases.
Default: com.sun.security.auth.module.Krb5LoginModule
.
-
spring.cloud.stream.kafka.binder.jaas.controlFlag
- The control flag of the login module.
Default: required
.
-
spring.cloud.stream.kafka.binder.jaas.options
- Map with a key/value pair containing the login module options.
Default: Empty map.
Here is an example of launching a Spring Cloud Stream application with SASL and Kerberos using Spring Boot configuration properties:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.[emailprotected]EXAMPLE.COM
This represents the equivalent of the following JAAS file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="[emailprotected]";
};
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent. As an alternative to setting spring.cloud.stream.kafka.binder.autoCreateTopics
you can simply remove the broker dependency from the application. See the section called “Excluding Kafka broker jar from the classpath of the binder based application” for details.
Do not mix JAAS configuration files and Spring Boot properties in the same application. If the
-Djava.security.auth.login.config
system property is already present, Spring Cloud Stream will ignore the Spring Boot properties.
Exercise caution when using the
autoCreateTopics
andautoAddPartitions
if using Kerberos. Usually applications may use principals that do not have administrative rights in Kafka and Zookeeper, and relying on Spring Cloud Stream to create/modify topics may fail. In secure environments, we strongly recommend creating topics and managing ACLs administratively using Kafka tooling.
Using the binder with Apache Kafka 0.10
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients. In order to do this, when you create the project that contains your application, include spring-cloud-starter-stream-kafka
as you normally would do for the default binder. Then add these dependencies at the top of the <dependencies>
section in the pom.xml file to override the dependencies.
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default spring-kafka
and spring-integration-kafka
versions can be retained.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
Here is another example of using 0.9.0.1 version.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
The versions above are provided only for the sake of the example. For best results, we recommend using the most recent 0.10-compatible versions of the projects.
Excluding Kafka broker jar from the classpath of the binder based application
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics. If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency. If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the spring-cloud-starter-stream-kafka
dependency as following.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
If you exclude the Apache Kafka server dependency and the topic is not present on the server, then the Apache Kafka broker will create the topic if auto topic creation is enabled on the server. Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors. On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that spring.cloud.stream.kafka.binder.autoCreateTopics
is set to true
, which is the default.
37.4 Kafka Streams Binding Capabilities of Spring Cloud Stream
Spring Cloud Stream Kafka support also includes a binder specifically designed for Kafka Streams binding. Using this binder, applications can be written that leverage the Kafka Streams API. For more information on Kafka Streams, see Kafka Streams API Developer Manual
Kafka Streams support in Spring Cloud Stream is based on the foundations provided by the Spring Kafka project. For details on that support, see Kafaka Streams Support in Spring Kafka .
Here are the maven coordinates for the Spring Cloud Stream KStream binder artifact.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
</dependency>
In addition to leveraging the Spring Cloud Stream programming model which is based on Spring Boot, one of the main other benefits that the KStream binder provides is the fact that it avoids the boilerplate configuration that one needs to write when using the Kafka Streams API directly. High level streams DSL provided through the Kafka Streams API can be used through Spring Cloud Stream in the current support.
37.4.1 Usage example of high level streams DSL
This application will listen from a Kafka topic and write the word count for each unique word that it sees in a 5 seconds time window.
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, String> process(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey(Serdes.String(), Serdes.String())
.count(TimeWindows.of(5000), "store-name")
.toStream()
.map((w, c) -> new KeyValue<>(null, "Count for " + w.key() + ": " + c));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
If you build it as Spring Boot runnable fat jar, you can run the above example in the following way:
java -jar uber.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
This means that the application will listen from the incoming Kafka topic words and write to the output topic counts.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are bound as KStream objects. As one may observe, the developer can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor rather than setting up the streams specific configuration required by the Kafka Streams infrastructure. All those boilerplate is handled by Spring Cloud Stream behind the scenes.
37.4.2 Support for interactive queries
If access to the KafkaStreams
is needed for interactive queries, the internal KafkaStreams
instance can be accessed via KStreamBuilderFactoryBean.getKafkaStreams()
. You can autowire the KStreamBuilderFactoryBean
instance provided by the KStream binder. Then you can get KafkaStreams
instance from it and retrieve the underlying store, execute queries on it, etc.
37.4.3 Kafka Streams properties
-
configuration
- Map with a key/value pair containing properties pertaining to Kafka Streams API. This property must be prefixed with
spring.cloud.stream.kstream.binder.
.
- Map with a key/value pair containing properties pertaining to Kafka Streams API. This property must be prefixed with
Following are some examples of using this property.
spring.cloud.stream.kstream.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs.
There can also be binding specific properties.
For instance, you can use a different Serde for your input or output destination.
spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde
-
timewindow.length
- Many streaming applications written using Kafka Streams involve windowning operations. If you specify this property, there is a
org.apache.kafka.streams.kstream.TimeWindows
bean automatically provided that can be autowired in applications. This property must be prefixed withspring.cloud.stream.kstream.
. A bean of typeorg.apache.kafka.streams.kstream.TimeWindows
is created only if this property is provided.
- Many streaming applications written using Kafka Streams involve windowning operations. If you specify this property, there is a
Following is an example of using this property.
Values are provided in milliseconds.
spring.cloud.stream.kstream.timeWindow.length=5000
-
timewindow.advanceBy
- This property goes hand in hand with
timewindow.length
and has no effect on its own. If you provide this property, the generatedorg.apache.kafka.streams.kstream.TimeWindows
bean will automatically conatin this information. This property must be prefixed withspring.cloud.stream.kstream.
.
- This property goes hand in hand with
Following is an example of using this property.
Values are provided in milliseconds.
spring.cloud.stream.kstream.timeWindow.advanceBy=1000
37.5 Error Channels
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination, and can be configured to send async producer send failures to an error channel too. See the section called “Message Channel Binders and Error Channels” for more information.
The payload of the ErrorMessage
for a send failure is a KafkaSendFailureException
with properties:
-
failedMessage
- the spring-messagingMessage<?>
that failed to be sent. -
record
- the rawProducerRecord
that was created from thefailedMessage
There is no automatic handling of these exceptions (such as sending to a Dead-Letter queue); you can consume these exceptions with your own Spring Integration flow.
37.6 Kafka Metrics
Kafka binder module exposes the following metrics:
spring.cloud.stream.binder.kafka.someGroup.someTopic.lag
- this metric indicates how many messages have not been yet consumed from given binder’s topic by given consumer group. For example if the value of the metric spring.cloud.stream.binder.kafka.myGroup.myTopic.lag
is 1000
, then consumer group myGroup
has 1000
messages to waiting to be consumed from topic myTopic
. This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice.
37.7 Dead-Letter Topic Processing
Because it can’t be anticipated how users would want to dispose of dead-lettered messages, the framework does not provide any standard mechanism to handle them. If the reason for the dead-lettering is transient, you may wish to route the messages back to the original topic. However, if the problem is a permanent issue, that could cause an infinite loop. The following spring-boot
application is an example of how to route those messages back to the original topic, but moves them to a third "parking lot" topic after three attempts. The application is simply another spring-cloud-stream application that reads from the dead-letter topic. It terminates when no messages are received for 5 seconds.
The examples assume the original destination is so8400out
and the consumer group is so8400
.
There are several considerations.
-
Consider only running the rerouting when the main application is not running. Otherwise, the retries for transient errors will be used up very quickly.
-
Alternatively, use a two-stage approach - use this application to route to a third topic, and another to route from there back to the main topic.
-
Since this technique uses a message header to keep track of retries, it won’t work with
headerMode=raw
. In that case, consider adding some data to the payload (that can be ignored by the main application). -
x-retries
has to be added to theheaders
propertyspring.cloud.stream.kafka.binder.headers=x-retries
on both this, and the main application so that the header is transported between the applications. -
Since kafka is publish/subscribe, replayed messages will be sent to each consumer group, even those that successfully processed a message the first time around.
application.properties.
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.output.producer.partitioned=true
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.bindings.parkingLot.producer.partitioned=true
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
Application.
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private MessageChannel parkingLot;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Message<?> reRoute(Message<?> failed) {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries.intValue() < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
parkingLot.send(MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, terminating");
return;
}
}
}
public interface TwoOutputProcessor extends Processor {
@Output("parkingLot")
MessageChannel parkingLot();
}
}