001/*
002 * Copyright 2012-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.kafka;
018
019import java.io.IOException;
020import java.time.Duration;
021import java.time.temporal.ChronoUnit;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027
028import org.apache.kafka.clients.CommonClientConfigs;
029import org.apache.kafka.clients.consumer.ConsumerConfig;
030import org.apache.kafka.clients.producer.ProducerConfig;
031import org.apache.kafka.common.config.SslConfigs;
032import org.apache.kafka.common.serialization.StringDeserializer;
033import org.apache.kafka.common.serialization.StringSerializer;
034
035import org.springframework.boot.context.properties.ConfigurationProperties;
036import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
037import org.springframework.boot.context.properties.PropertyMapper;
038import org.springframework.boot.convert.DurationUnit;
039import org.springframework.core.io.Resource;
040import org.springframework.kafka.listener.ContainerProperties.AckMode;
041import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
042import org.springframework.util.CollectionUtils;
043import org.springframework.util.unit.DataSize;
044
045/**
046 * Configuration properties for Spring for Apache Kafka.
047 * <p>
048 * Users should refer to Kafka documentation for complete descriptions of these
049 * properties.
050 *
051 * @author Gary Russell
052 * @author Stephane Nicoll
053 * @author Artem Bilan
054 * @author Nakul Mishra
055 * @since 1.5.0
056 */
057@ConfigurationProperties(prefix = "spring.kafka")
058public class KafkaProperties {
059
060        /**
061         * Comma-delimited list of host:port pairs to use for establishing the initial
062         * connections to the Kafka cluster. Applies to all components unless overridden.
063         */
064        private List<String> bootstrapServers = new ArrayList<>(
065                        Collections.singletonList("localhost:9092"));
066
067        /**
068         * ID to pass to the server when making requests. Used for server-side logging.
069         */
070        private String clientId;
071
072        /**
073         * Additional properties, common to producers and consumers, used to configure the
074         * client.
075         */
076        private final Map<String, String> properties = new HashMap<>();
077
078        private final Consumer consumer = new Consumer();
079
080        private final Producer producer = new Producer();
081
082        private final Admin admin = new Admin();
083
084        private final Streams streams = new Streams();
085
086        private final Listener listener = new Listener();
087
088        private final Ssl ssl = new Ssl();
089
090        private final Jaas jaas = new Jaas();
091
092        private final Template template = new Template();
093
094        public List<String> getBootstrapServers() {
095                return this.bootstrapServers;
096        }
097
098        public void setBootstrapServers(List<String> bootstrapServers) {
099                this.bootstrapServers = bootstrapServers;
100        }
101
102        public String getClientId() {
103                return this.clientId;
104        }
105
106        public void setClientId(String clientId) {
107                this.clientId = clientId;
108        }
109
110        public Map<String, String> getProperties() {
111                return this.properties;
112        }
113
114        public Consumer getConsumer() {
115                return this.consumer;
116        }
117
118        public Producer getProducer() {
119                return this.producer;
120        }
121
122        public Listener getListener() {
123                return this.listener;
124        }
125
126        public Admin getAdmin() {
127                return this.admin;
128        }
129
130        public Streams getStreams() {
131                return this.streams;
132        }
133
134        public Ssl getSsl() {
135                return this.ssl;
136        }
137
138        public Jaas getJaas() {
139                return this.jaas;
140        }
141
142        public Template getTemplate() {
143                return this.template;
144        }
145
146        private Map<String, Object> buildCommonProperties() {
147                Map<String, Object> properties = new HashMap<>();
148                if (this.bootstrapServers != null) {
149                        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
150                                        this.bootstrapServers);
151                }
152                if (this.clientId != null) {
153                        properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
154                }
155                properties.putAll(this.ssl.buildProperties());
156                if (!CollectionUtils.isEmpty(this.properties)) {
157                        properties.putAll(this.properties);
158                }
159                return properties;
160        }
161
162        /**
163         * Create an initial map of consumer properties from the state of this instance.
164         * <p>
165         * This allows you to add additional properties, if necessary, and override the
166         * default kafkaConsumerFactory bean.
167         * @return the consumer properties initialized with the customizations defined on this
168         * instance
169         */
170        public Map<String, Object> buildConsumerProperties() {
171                Map<String, Object> properties = buildCommonProperties();
172                properties.putAll(this.consumer.buildProperties());
173                return properties;
174        }
175
176        /**
177         * Create an initial map of producer properties from the state of this instance.
178         * <p>
179         * This allows you to add additional properties, if necessary, and override the
180         * default kafkaProducerFactory bean.
181         * @return the producer properties initialized with the customizations defined on this
182         * instance
183         */
184        public Map<String, Object> buildProducerProperties() {
185                Map<String, Object> properties = buildCommonProperties();
186                properties.putAll(this.producer.buildProperties());
187                return properties;
188        }
189
190        /**
191         * Create an initial map of admin properties from the state of this instance.
192         * <p>
193         * This allows you to add additional properties, if necessary, and override the
194         * default kafkaAdmin bean.
195         * @return the admin properties initialized with the customizations defined on this
196         * instance
197         */
198        public Map<String, Object> buildAdminProperties() {
199                Map<String, Object> properties = buildCommonProperties();
200                properties.putAll(this.admin.buildProperties());
201                return properties;
202        }
203
204        /**
205         * Create an initial map of streams properties from the state of this instance.
206         * <p>
207         * This allows you to add additional properties, if necessary.
208         * @return the streams properties initialized with the customizations defined on this
209         * instance
210         */
211        public Map<String, Object> buildStreamsProperties() {
212                Map<String, Object> properties = buildCommonProperties();
213                properties.putAll(this.streams.buildProperties());
214                return properties;
215        }
216
217        public static class Consumer {
218
219                private final Ssl ssl = new Ssl();
220
221                /**
222                 * Frequency with which the consumer offsets are auto-committed to Kafka if
223                 * 'enable.auto.commit' is set to true.
224                 */
225                private Duration autoCommitInterval;
226
227                /**
228                 * What to do when there is no initial offset in Kafka or if the current offset no
229                 * longer exists on the server.
230                 */
231                private String autoOffsetReset;
232
233                /**
234                 * Comma-delimited list of host:port pairs to use for establishing the initial
235                 * connections to the Kafka cluster. Overrides the global property, for consumers.
236                 */
237                private List<String> bootstrapServers;
238
239                /**
240                 * ID to pass to the server when making requests. Used for server-side logging.
241                 */
242                private String clientId;
243
244                /**
245                 * Whether the consumer's offset is periodically committed in the background.
246                 */
247                private Boolean enableAutoCommit;
248
249                /**
250                 * Maximum amount of time the server blocks before answering the fetch request if
251                 * there isn't sufficient data to immediately satisfy the requirement given by
252                 * "fetch-min-size".
253                 */
254                private Duration fetchMaxWait;
255
256                /**
257                 * Minimum amount of data the server should return for a fetch request.
258                 */
259                private DataSize fetchMinSize;
260
261                /**
262                 * Unique string that identifies the consumer group to which this consumer
263                 * belongs.
264                 */
265                private String groupId;
266
267                /**
268                 * Expected time between heartbeats to the consumer coordinator.
269                 */
270                private Duration heartbeatInterval;
271
272                /**
273                 * Deserializer class for keys.
274                 */
275                private Class<?> keyDeserializer = StringDeserializer.class;
276
277                /**
278                 * Deserializer class for values.
279                 */
280                private Class<?> valueDeserializer = StringDeserializer.class;
281
282                /**
283                 * Maximum number of records returned in a single call to poll().
284                 */
285                private Integer maxPollRecords;
286
287                /**
288                 * Additional consumer-specific properties used to configure the client.
289                 */
290                private final Map<String, String> properties = new HashMap<>();
291
292                public Ssl getSsl() {
293                        return this.ssl;
294                }
295
296                public Duration getAutoCommitInterval() {
297                        return this.autoCommitInterval;
298                }
299
300                public void setAutoCommitInterval(Duration autoCommitInterval) {
301                        this.autoCommitInterval = autoCommitInterval;
302                }
303
304                public String getAutoOffsetReset() {
305                        return this.autoOffsetReset;
306                }
307
308                public void setAutoOffsetReset(String autoOffsetReset) {
309                        this.autoOffsetReset = autoOffsetReset;
310                }
311
312                public List<String> getBootstrapServers() {
313                        return this.bootstrapServers;
314                }
315
316                public void setBootstrapServers(List<String> bootstrapServers) {
317                        this.bootstrapServers = bootstrapServers;
318                }
319
320                public String getClientId() {
321                        return this.clientId;
322                }
323
324                public void setClientId(String clientId) {
325                        this.clientId = clientId;
326                }
327
328                public Boolean getEnableAutoCommit() {
329                        return this.enableAutoCommit;
330                }
331
332                public void setEnableAutoCommit(Boolean enableAutoCommit) {
333                        this.enableAutoCommit = enableAutoCommit;
334                }
335
336                public Duration getFetchMaxWait() {
337                        return this.fetchMaxWait;
338                }
339
340                public void setFetchMaxWait(Duration fetchMaxWait) {
341                        this.fetchMaxWait = fetchMaxWait;
342                }
343
344                public DataSize getFetchMinSize() {
345                        return this.fetchMinSize;
346                }
347
348                public void setFetchMinSize(DataSize fetchMinSize) {
349                        this.fetchMinSize = fetchMinSize;
350                }
351
352                public String getGroupId() {
353                        return this.groupId;
354                }
355
356                public void setGroupId(String groupId) {
357                        this.groupId = groupId;
358                }
359
360                public Duration getHeartbeatInterval() {
361                        return this.heartbeatInterval;
362                }
363
364                public void setHeartbeatInterval(Duration heartbeatInterval) {
365                        this.heartbeatInterval = heartbeatInterval;
366                }
367
368                public Class<?> getKeyDeserializer() {
369                        return this.keyDeserializer;
370                }
371
372                public void setKeyDeserializer(Class<?> keyDeserializer) {
373                        this.keyDeserializer = keyDeserializer;
374                }
375
376                public Class<?> getValueDeserializer() {
377                        return this.valueDeserializer;
378                }
379
380                public void setValueDeserializer(Class<?> valueDeserializer) {
381                        this.valueDeserializer = valueDeserializer;
382                }
383
384                public Integer getMaxPollRecords() {
385                        return this.maxPollRecords;
386                }
387
388                public void setMaxPollRecords(Integer maxPollRecords) {
389                        this.maxPollRecords = maxPollRecords;
390                }
391
392                public Map<String, String> getProperties() {
393                        return this.properties;
394                }
395
396                public Map<String, Object> buildProperties() {
397                        Properties properties = new Properties();
398                        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
399                        map.from(this::getAutoCommitInterval).asInt(Duration::toMillis)
400                                        .to(properties.in(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
401                        map.from(this::getAutoOffsetReset)
402                                        .to(properties.in(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
403                        map.from(this::getBootstrapServers)
404                                        .to(properties.in(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
405                        map.from(this::getClientId)
406                                        .to(properties.in(ConsumerConfig.CLIENT_ID_CONFIG));
407                        map.from(this::getEnableAutoCommit)
408                                        .to(properties.in(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
409                        map.from(this::getFetchMaxWait).asInt(Duration::toMillis)
410                                        .to(properties.in(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG));
411                        map.from(this::getFetchMinSize).asInt(DataSize::toBytes)
412                                        .to(properties.in(ConsumerConfig.FETCH_MIN_BYTES_CONFIG));
413                        map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
414                        map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
415                                        .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
416                        map.from(this::getKeyDeserializer)
417                                        .to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
418                        map.from(this::getValueDeserializer)
419                                        .to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
420                        map.from(this::getMaxPollRecords)
421                                        .to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
422                        return properties.with(this.ssl, this.properties);
423                }
424
425        }
426
427        public static class Producer {
428
429                private final Ssl ssl = new Ssl();
430
431                /**
432                 * Number of acknowledgments the producer requires the leader to have received
433                 * before considering a request complete.
434                 */
435                private String acks;
436
437                /**
438                 * Default batch size. A small batch size will make batching less common and may
439                 * reduce throughput (a batch size of zero disables batching entirely).
440                 */
441                private DataSize batchSize;
442
443                /**
444                 * Comma-delimited list of host:port pairs to use for establishing the initial
445                 * connections to the Kafka cluster. Overrides the global property, for producers.
446                 */
447                private List<String> bootstrapServers;
448
449                /**
450                 * Total memory size the producer can use to buffer records waiting to be sent to
451                 * the server.
452                 */
453                private DataSize bufferMemory;
454
455                /**
456                 * ID to pass to the server when making requests. Used for server-side logging.
457                 */
458                private String clientId;
459
460                /**
461                 * Compression type for all data generated by the producer.
462                 */
463                private String compressionType;
464
465                /**
466                 * Serializer class for keys.
467                 */
468                private Class<?> keySerializer = StringSerializer.class;
469
470                /**
471                 * Serializer class for values.
472                 */
473                private Class<?> valueSerializer = StringSerializer.class;
474
475                /**
476                 * When greater than zero, enables retrying of failed sends.
477                 */
478                private Integer retries;
479
480                /**
481                 * When non empty, enables transaction support for producer.
482                 */
483                private String transactionIdPrefix;
484
485                /**
486                 * Additional producer-specific properties used to configure the client.
487                 */
488                private final Map<String, String> properties = new HashMap<>();
489
490                public Ssl getSsl() {
491                        return this.ssl;
492                }
493
494                public String getAcks() {
495                        return this.acks;
496                }
497
498                public void setAcks(String acks) {
499                        this.acks = acks;
500                }
501
502                public DataSize getBatchSize() {
503                        return this.batchSize;
504                }
505
506                public void setBatchSize(DataSize batchSize) {
507                        this.batchSize = batchSize;
508                }
509
510                public List<String> getBootstrapServers() {
511                        return this.bootstrapServers;
512                }
513
514                public void setBootstrapServers(List<String> bootstrapServers) {
515                        this.bootstrapServers = bootstrapServers;
516                }
517
518                public DataSize getBufferMemory() {
519                        return this.bufferMemory;
520                }
521
522                public void setBufferMemory(DataSize bufferMemory) {
523                        this.bufferMemory = bufferMemory;
524                }
525
526                public String getClientId() {
527                        return this.clientId;
528                }
529
530                public void setClientId(String clientId) {
531                        this.clientId = clientId;
532                }
533
534                public String getCompressionType() {
535                        return this.compressionType;
536                }
537
538                public void setCompressionType(String compressionType) {
539                        this.compressionType = compressionType;
540                }
541
542                public Class<?> getKeySerializer() {
543                        return this.keySerializer;
544                }
545
546                public void setKeySerializer(Class<?> keySerializer) {
547                        this.keySerializer = keySerializer;
548                }
549
550                public Class<?> getValueSerializer() {
551                        return this.valueSerializer;
552                }
553
554                public void setValueSerializer(Class<?> valueSerializer) {
555                        this.valueSerializer = valueSerializer;
556                }
557
558                public Integer getRetries() {
559                        return this.retries;
560                }
561
562                public void setRetries(Integer retries) {
563                        this.retries = retries;
564                }
565
566                public String getTransactionIdPrefix() {
567                        return this.transactionIdPrefix;
568                }
569
570                public void setTransactionIdPrefix(String transactionIdPrefix) {
571                        this.transactionIdPrefix = transactionIdPrefix;
572                }
573
574                public Map<String, String> getProperties() {
575                        return this.properties;
576                }
577
578                public Map<String, Object> buildProperties() {
579                        Properties properties = new Properties();
580                        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
581                        map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG));
582                        map.from(this::getBatchSize).asInt(DataSize::toBytes)
583                                        .to(properties.in(ProducerConfig.BATCH_SIZE_CONFIG));
584                        map.from(this::getBootstrapServers)
585                                        .to(properties.in(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
586                        map.from(this::getBufferMemory).as(DataSize::toBytes)
587                                        .to(properties.in(ProducerConfig.BUFFER_MEMORY_CONFIG));
588                        map.from(this::getClientId)
589                                        .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
590                        map.from(this::getCompressionType)
591                                        .to(properties.in(ProducerConfig.COMPRESSION_TYPE_CONFIG));
592                        map.from(this::getKeySerializer)
593                                        .to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
594                        map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
595                        map.from(this::getValueSerializer)
596                                        .to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
597                        return properties.with(this.ssl, this.properties);
598                }
599
600        }
601
602        public static class Admin {
603
604                private final Ssl ssl = new Ssl();
605
606                /**
607                 * ID to pass to the server when making requests. Used for server-side logging.
608                 */
609                private String clientId;
610
611                /**
612                 * Additional admin-specific properties used to configure the client.
613                 */
614                private final Map<String, String> properties = new HashMap<>();
615
616                /**
617                 * Whether to fail fast if the broker is not available on startup.
618                 */
619                private boolean failFast;
620
621                public Ssl getSsl() {
622                        return this.ssl;
623                }
624
625                public String getClientId() {
626                        return this.clientId;
627                }
628
629                public void setClientId(String clientId) {
630                        this.clientId = clientId;
631                }
632
633                public boolean isFailFast() {
634                        return this.failFast;
635                }
636
637                public void setFailFast(boolean failFast) {
638                        this.failFast = failFast;
639                }
640
641                public Map<String, String> getProperties() {
642                        return this.properties;
643                }
644
645                public Map<String, Object> buildProperties() {
646                        Properties properties = new Properties();
647                        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
648                        map.from(this::getClientId)
649                                        .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
650                        return properties.with(this.ssl, this.properties);
651                }
652
653        }
654
655        /**
656         * High (and some medium) priority Streams properties and a general properties bucket.
657         */
658        public static class Streams {
659
660                private final Ssl ssl = new Ssl();
661
662                /**
663                 * Kafka streams application.id property; default spring.application.name.
664                 */
665                private String applicationId;
666
667                /**
668                 * Whether or not to auto-start the streams factory bean.
669                 */
670                private boolean autoStartup = true;
671
672                /**
673                 * Comma-delimited list of host:port pairs to use for establishing the initial
674                 * connections to the Kafka cluster. Overrides the global property, for streams.
675                 */
676                private List<String> bootstrapServers;
677
678                /**
679                 * Maximum memory size to be used for buffering across all threads.
680                 */
681                private DataSize cacheMaxSizeBuffering;
682
683                /**
684                 * ID to pass to the server when making requests. Used for server-side logging.
685                 */
686                private String clientId;
687
688                /**
689                 * The replication factor for change log topics and repartition topics created by
690                 * the stream processing application.
691                 */
692                private Integer replicationFactor;
693
694                /**
695                 * Directory location for the state store.
696                 */
697                private String stateDir;
698
699                /**
700                 * Additional Kafka properties used to configure the streams.
701                 */
702                private final Map<String, String> properties = new HashMap<>();
703
704                public Ssl getSsl() {
705                        return this.ssl;
706                }
707
708                public String getApplicationId() {
709                        return this.applicationId;
710                }
711
712                public void setApplicationId(String applicationId) {
713                        this.applicationId = applicationId;
714                }
715
716                public boolean isAutoStartup() {
717                        return this.autoStartup;
718                }
719
720                public void setAutoStartup(boolean autoStartup) {
721                        this.autoStartup = autoStartup;
722                }
723
724                public List<String> getBootstrapServers() {
725                        return this.bootstrapServers;
726                }
727
728                public void setBootstrapServers(List<String> bootstrapServers) {
729                        this.bootstrapServers = bootstrapServers;
730                }
731
732                @DeprecatedConfigurationProperty(replacement = "spring.kafka.streams.cache-max-size-buffering")
733                @Deprecated
734                public Integer getCacheMaxBytesBuffering() {
735                        return (this.cacheMaxSizeBuffering != null)
736                                        ? (int) this.cacheMaxSizeBuffering.toBytes() : null;
737                }
738
739                @Deprecated
740                public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) {
741                        DataSize cacheMaxSizeBuffering = (cacheMaxBytesBuffering != null)
742                                        ? DataSize.ofBytes(cacheMaxBytesBuffering) : null;
743                        setCacheMaxSizeBuffering(cacheMaxSizeBuffering);
744                }
745
746                public DataSize getCacheMaxSizeBuffering() {
747                        return this.cacheMaxSizeBuffering;
748                }
749
750                public void setCacheMaxSizeBuffering(DataSize cacheMaxSizeBuffering) {
751                        this.cacheMaxSizeBuffering = cacheMaxSizeBuffering;
752                }
753
754                public String getClientId() {
755                        return this.clientId;
756                }
757
758                public void setClientId(String clientId) {
759                        this.clientId = clientId;
760                }
761
762                public Integer getReplicationFactor() {
763                        return this.replicationFactor;
764                }
765
766                public void setReplicationFactor(Integer replicationFactor) {
767                        this.replicationFactor = replicationFactor;
768                }
769
770                public String getStateDir() {
771                        return this.stateDir;
772                }
773
774                public void setStateDir(String stateDir) {
775                        this.stateDir = stateDir;
776                }
777
778                public Map<String, String> getProperties() {
779                        return this.properties;
780                }
781
782                public Map<String, Object> buildProperties() {
783                        Properties properties = new Properties();
784                        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
785                        map.from(this::getApplicationId).to(properties.in("application.id"));
786                        map.from(this::getBootstrapServers)
787                                        .to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
788                        map.from(this::getCacheMaxSizeBuffering).asInt(DataSize::toBytes)
789                                        .to(properties.in("cache.max.bytes.buffering"));
790                        map.from(this::getClientId)
791                                        .to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
792                        map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
793                        map.from(this::getStateDir).to(properties.in("state.dir"));
794                        return properties.with(this.ssl, this.properties);
795                }
796
797        }
798
799        public static class Template {
800
801                /**
802                 * Default topic to which messages are sent.
803                 */
804                private String defaultTopic;
805
806                public String getDefaultTopic() {
807                        return this.defaultTopic;
808                }
809
810                public void setDefaultTopic(String defaultTopic) {
811                        this.defaultTopic = defaultTopic;
812                }
813
814        }
815
816        public static class Listener {
817
818                public enum Type {
819
820                        /**
821                         * Invokes the endpoint with one ConsumerRecord at a time.
822                         */
823                        SINGLE,
824
825                        /**
826                         * Invokes the endpoint with a batch of ConsumerRecords.
827                         */
828                        BATCH
829
830                }
831
832                /**
833                 * Listener type.
834                 */
835                private Type type = Type.SINGLE;
836
837                /**
838                 * Listener AckMode. See the spring-kafka documentation.
839                 */
840                private AckMode ackMode;
841
842                /**
843                 * Prefix for the listener's consumer client.id property.
844                 */
845                private String clientId;
846
847                /**
848                 * Number of threads to run in the listener containers.
849                 */
850                private Integer concurrency;
851
852                /**
853                 * Timeout to use when polling the consumer.
854                 */
855                private Duration pollTimeout;
856
857                /**
858                 * Multiplier applied to "pollTimeout" to determine if a consumer is
859                 * non-responsive.
860                 */
861                private Float noPollThreshold;
862
863                /**
864                 * Number of records between offset commits when ackMode is "COUNT" or
865                 * "COUNT_TIME".
866                 */
867                private Integer ackCount;
868
869                /**
870                 * Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
871                 */
872                private Duration ackTime;
873
874                /**
875                 * Time between publishing idle consumer events (no data received).
876                 */
877                private Duration idleEventInterval;
878
879                /**
880                 * Time between checks for non-responsive consumers. If a duration suffix is not
881                 * specified, seconds will be used.
882                 */
883                @DurationUnit(ChronoUnit.SECONDS)
884                private Duration monitorInterval;
885
886                /**
887                 * Whether to log the container configuration during initialization (INFO level).
888                 */
889                private Boolean logContainerConfig;
890
891                public Type getType() {
892                        return this.type;
893                }
894
895                public void setType(Type type) {
896                        this.type = type;
897                }
898
899                public AckMode getAckMode() {
900                        return this.ackMode;
901                }
902
903                public void setAckMode(AckMode ackMode) {
904                        this.ackMode = ackMode;
905                }
906
907                public String getClientId() {
908                        return this.clientId;
909                }
910
911                public void setClientId(String clientId) {
912                        this.clientId = clientId;
913                }
914
915                public Integer getConcurrency() {
916                        return this.concurrency;
917                }
918
919                public void setConcurrency(Integer concurrency) {
920                        this.concurrency = concurrency;
921                }
922
923                public Duration getPollTimeout() {
924                        return this.pollTimeout;
925                }
926
927                public void setPollTimeout(Duration pollTimeout) {
928                        this.pollTimeout = pollTimeout;
929                }
930
931                public Float getNoPollThreshold() {
932                        return this.noPollThreshold;
933                }
934
935                public void setNoPollThreshold(Float noPollThreshold) {
936                        this.noPollThreshold = noPollThreshold;
937                }
938
939                public Integer getAckCount() {
940                        return this.ackCount;
941                }
942
943                public void setAckCount(Integer ackCount) {
944                        this.ackCount = ackCount;
945                }
946
947                public Duration getAckTime() {
948                        return this.ackTime;
949                }
950
951                public void setAckTime(Duration ackTime) {
952                        this.ackTime = ackTime;
953                }
954
955                public Duration getIdleEventInterval() {
956                        return this.idleEventInterval;
957                }
958
959                public void setIdleEventInterval(Duration idleEventInterval) {
960                        this.idleEventInterval = idleEventInterval;
961                }
962
963                public Duration getMonitorInterval() {
964                        return this.monitorInterval;
965                }
966
967                public void setMonitorInterval(Duration monitorInterval) {
968                        this.monitorInterval = monitorInterval;
969                }
970
971                public Boolean getLogContainerConfig() {
972                        return this.logContainerConfig;
973                }
974
975                public void setLogContainerConfig(Boolean logContainerConfig) {
976                        this.logContainerConfig = logContainerConfig;
977                }
978
979        }
980
981        public static class Ssl {
982
983                /**
984                 * Password of the private key in the key store file.
985                 */
986                private String keyPassword;
987
988                /**
989                 * Location of the key store file.
990                 */
991                private Resource keyStoreLocation;
992
993                /**
994                 * Store password for the key store file.
995                 */
996                private String keyStorePassword;
997
998                /**
999                 * Type of the key store.
1000                 */
1001                private String keyStoreType;
1002
1003                /**
1004                 * Location of the trust store file.
1005                 */
1006                private Resource trustStoreLocation;
1007
1008                /**
1009                 * Store password for the trust store file.
1010                 */
1011                private String trustStorePassword;
1012
1013                /**
1014                 * Type of the trust store.
1015                 */
1016                private String trustStoreType;
1017
1018                /**
1019                 * SSL protocol to use.
1020                 */
1021                private String protocol;
1022
1023                public String getKeyPassword() {
1024                        return this.keyPassword;
1025                }
1026
1027                public void setKeyPassword(String keyPassword) {
1028                        this.keyPassword = keyPassword;
1029                }
1030
1031                public Resource getKeyStoreLocation() {
1032                        return this.keyStoreLocation;
1033                }
1034
1035                public void setKeyStoreLocation(Resource keyStoreLocation) {
1036                        this.keyStoreLocation = keyStoreLocation;
1037                }
1038
1039                public String getKeyStorePassword() {
1040                        return this.keyStorePassword;
1041                }
1042
1043                public void setKeyStorePassword(String keyStorePassword) {
1044                        this.keyStorePassword = keyStorePassword;
1045                }
1046
1047                public String getKeyStoreType() {
1048                        return this.keyStoreType;
1049                }
1050
1051                public void setKeyStoreType(String keyStoreType) {
1052                        this.keyStoreType = keyStoreType;
1053                }
1054
1055                public Resource getTrustStoreLocation() {
1056                        return this.trustStoreLocation;
1057                }
1058
1059                public void setTrustStoreLocation(Resource trustStoreLocation) {
1060                        this.trustStoreLocation = trustStoreLocation;
1061                }
1062
1063                public String getTrustStorePassword() {
1064                        return this.trustStorePassword;
1065                }
1066
1067                public void setTrustStorePassword(String trustStorePassword) {
1068                        this.trustStorePassword = trustStorePassword;
1069                }
1070
1071                public String getTrustStoreType() {
1072                        return this.trustStoreType;
1073                }
1074
1075                public void setTrustStoreType(String trustStoreType) {
1076                        this.trustStoreType = trustStoreType;
1077                }
1078
1079                public String getProtocol() {
1080                        return this.protocol;
1081                }
1082
1083                public void setProtocol(String protocol) {
1084                        this.protocol = protocol;
1085                }
1086
1087                public Map<String, Object> buildProperties() {
1088                        Properties properties = new Properties();
1089                        PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
1090                        map.from(this::getKeyPassword)
1091                                        .to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
1092                        map.from(this::getKeyStoreLocation).as(this::resourceToPath)
1093                                        .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
1094                        map.from(this::getKeyStorePassword)
1095                                        .to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
1096                        map.from(this::getKeyStoreType)
1097                                        .to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
1098                        map.from(this::getTrustStoreLocation).as(this::resourceToPath)
1099                                        .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
1100                        map.from(this::getTrustStorePassword)
1101                                        .to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
1102                        map.from(this::getTrustStoreType)
1103                                        .to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
1104                        map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
1105                        return properties;
1106                }
1107
1108                private String resourceToPath(Resource resource) {
1109                        try {
1110                                return resource.getFile().getAbsolutePath();
1111                        }
1112                        catch (IOException ex) {
1113                                throw new IllegalStateException(
1114                                                "Resource '" + resource + "' must be on a file system", ex);
1115                        }
1116                }
1117
1118        }
1119
1120        public static class Jaas {
1121
1122                /**
1123                 * Whether to enable JAAS configuration.
1124                 */
1125                private boolean enabled;
1126
1127                /**
1128                 * Login module.
1129                 */
1130                private String loginModule = "com.sun.security.auth.module.Krb5LoginModule";
1131
1132                /**
1133                 * Control flag for login configuration.
1134                 */
1135                private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
1136
1137                /**
1138                 * Additional JAAS options.
1139                 */
1140                private final Map<String, String> options = new HashMap<>();
1141
1142                public boolean isEnabled() {
1143                        return this.enabled;
1144                }
1145
1146                public void setEnabled(boolean enabled) {
1147                        this.enabled = enabled;
1148                }
1149
1150                public String getLoginModule() {
1151                        return this.loginModule;
1152                }
1153
1154                public void setLoginModule(String loginModule) {
1155                        this.loginModule = loginModule;
1156                }
1157
1158                public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() {
1159                        return this.controlFlag;
1160                }
1161
1162                public void setControlFlag(
1163                                KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) {
1164                        this.controlFlag = controlFlag;
1165                }
1166
1167                public Map<String, String> getOptions() {
1168                        return this.options;
1169                }
1170
1171                public void setOptions(Map<String, String> options) {
1172                        if (options != null) {
1173                                this.options.putAll(options);
1174                        }
1175                }
1176
1177        }
1178
1179        @SuppressWarnings("serial")
1180        private static class Properties extends HashMap<String, Object> {
1181
1182                public <V> java.util.function.Consumer<V> in(String key) {
1183                        return (value) -> put(key, value);
1184                }
1185
1186                public Properties with(Ssl ssl, Map<String, String> properties) {
1187                        putAll(ssl.buildProperties());
1188                        putAll(properties);
1189                        return this;
1190                }
1191
1192        }
1193
1194}