001/*
002 * Copyright 2012-2016 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.boot.autoconfigure.kafka;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025
026import org.apache.kafka.clients.CommonClientConfigs;
027import org.apache.kafka.clients.consumer.ConsumerConfig;
028import org.apache.kafka.clients.producer.ProducerConfig;
029import org.apache.kafka.common.config.SslConfigs;
030import org.apache.kafka.common.serialization.StringDeserializer;
031import org.apache.kafka.common.serialization.StringSerializer;
032
033import org.springframework.boot.context.properties.ConfigurationProperties;
034import org.springframework.core.io.Resource;
035import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
036import org.springframework.util.CollectionUtils;
037
038/**
039 * Configuration properties for Spring for Apache Kafka.
040 * <p>
041 * Users should refer to Kafka documentation for complete descriptions of these
042 * properties.
043 *
044 * @author Gary Russell
045 * @author Stephane Nicoll
046 * @author Artem Bilan
047 * @since 1.5.0
048 */
049@ConfigurationProperties(prefix = "spring.kafka")
050public class KafkaProperties {
051
052        /**
053         * Comma-delimited list of host:port pairs to use for establishing the initial
054         * connection to the Kafka cluster.
055         */
056        private List<String> bootstrapServers = new ArrayList<String>(
057                        Collections.singletonList("localhost:9092"));
058
059        /**
060         * Id to pass to the server when making requests; used for server-side logging.
061         */
062        private String clientId;
063
064        /**
065         * Additional properties used to configure the client.
066         */
067        private Map<String, String> properties = new HashMap<String, String>();
068
069        private final Consumer consumer = new Consumer();
070
071        private final Producer producer = new Producer();
072
073        private final Listener listener = new Listener();
074
075        private final Ssl ssl = new Ssl();
076
077        private final Template template = new Template();
078
079        public List<String> getBootstrapServers() {
080                return this.bootstrapServers;
081        }
082
083        public void setBootstrapServers(List<String> bootstrapServers) {
084                this.bootstrapServers = bootstrapServers;
085        }
086
087        public String getClientId() {
088                return this.clientId;
089        }
090
091        public void setClientId(String clientId) {
092                this.clientId = clientId;
093        }
094
095        public Map<String, String> getProperties() {
096                return this.properties;
097        }
098
099        public void setProperties(Map<String, String> properties) {
100                this.properties = properties;
101        }
102
103        public Consumer getConsumer() {
104                return this.consumer;
105        }
106
107        public Producer getProducer() {
108                return this.producer;
109        }
110
111        public Listener getListener() {
112                return this.listener;
113        }
114
115        public Ssl getSsl() {
116                return this.ssl;
117        }
118
119        public Template getTemplate() {
120                return this.template;
121        }
122
123        private Map<String, Object> buildCommonProperties() {
124                Map<String, Object> properties = new HashMap<String, Object>();
125                if (this.bootstrapServers != null) {
126                        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
127                                        this.bootstrapServers);
128                }
129                if (this.clientId != null) {
130                        properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
131                }
132                if (this.ssl.getKeyPassword() != null) {
133                        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword());
134                }
135                if (this.ssl.getKeystoreLocation() != null) {
136                        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
137                                        resourceToPath(this.ssl.getKeystoreLocation()));
138                }
139                if (this.ssl.getKeystorePassword() != null) {
140                        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
141                                        this.ssl.getKeystorePassword());
142                }
143                if (this.ssl.getTruststoreLocation() != null) {
144                        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
145                                        resourceToPath(this.ssl.getTruststoreLocation()));
146                }
147                if (this.ssl.getTruststorePassword() != null) {
148                        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
149                                        this.ssl.getTruststorePassword());
150                }
151                if (!CollectionUtils.isEmpty(this.properties)) {
152                        properties.putAll(this.properties);
153                }
154                return properties;
155        }
156
157        /**
158         * Create an initial map of consumer properties from the state of this instance.
159         * <p>
160         * This allows you to add additional properties, if necessary, and override the
161         * default kafkaConsumerFactory bean.
162         * @return the consumer properties initialized with the customizations defined on this
163         * instance
164         */
165        public Map<String, Object> buildConsumerProperties() {
166                Map<String, Object> properties = buildCommonProperties();
167                properties.putAll(this.consumer.buildProperties());
168                return properties;
169        }
170
171        /**
172         * Create an initial map of producer properties from the state of this instance.
173         * <p>
174         * This allows you to add additional properties, if necessary, and override the
175         * default kafkaProducerFactory bean.
176         * @return the producer properties initialized with the customizations defined on this
177         * instance
178         */
179        public Map<String, Object> buildProducerProperties() {
180                Map<String, Object> properties = buildCommonProperties();
181                properties.putAll(this.producer.buildProperties());
182                return properties;
183        }
184
185        private static String resourceToPath(Resource resource) {
186                try {
187                        return resource.getFile().getAbsolutePath();
188                }
189                catch (IOException ex) {
190                        throw new IllegalStateException(
191                                        "Resource '" + resource + "' must be on a file system", ex);
192                }
193        }
194
195        public static class Consumer {
196
197                private final Ssl ssl = new Ssl();
198
199                /**
200                 * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka
201                 * if 'enable.auto.commit' true.
202                 */
203                private Integer autoCommitInterval;
204
205                /**
206                 * What to do when there is no initial offset in Kafka or if the current offset
207                 * does not exist any more on the server.
208                 */
209                private String autoOffsetReset;
210
211                /**
212                 * Comma-delimited list of host:port pairs to use for establishing the initial
213                 * connection to the Kafka cluster.
214                 */
215                private List<String> bootstrapServers;
216
217                /**
218                 * Id to pass to the server when making requests; used for server-side logging.
219                 */
220                private String clientId;
221
222                /**
223                 * If true the consumer's offset will be periodically committed in the background.
224                 */
225                private Boolean enableAutoCommit;
226
227                /**
228                 * Maximum amount of time in milliseconds the server will block before answering
229                 * the fetch request if there isn't sufficient data to immediately satisfy the
230                 * requirement given by "fetch.min.bytes".
231                 */
232                private Integer fetchMaxWait;
233
234                /**
235                 * Minimum amount of data the server should return for a fetch request in bytes.
236                 */
237                private Integer fetchMinSize;
238
239                /**
240                 * Unique string that identifies the consumer group this consumer belongs to.
241                 */
242                private String groupId;
243
244                /**
245                 * Expected time in milliseconds between heartbeats to the consumer coordinator.
246                 */
247                private Integer heartbeatInterval;
248
249                /**
250                 * Deserializer class for keys.
251                 */
252                private Class<?> keyDeserializer = StringDeserializer.class;
253
254                /**
255                 * Deserializer class for values.
256                 */
257                private Class<?> valueDeserializer = StringDeserializer.class;
258
259                /**
260                 * Maximum number of records returned in a single call to poll().
261                 */
262                private Integer maxPollRecords;
263
264                public Ssl getSsl() {
265                        return this.ssl;
266                }
267
268                public Integer getAutoCommitInterval() {
269                        return this.autoCommitInterval;
270                }
271
272                public void setAutoCommitInterval(Integer autoCommitInterval) {
273                        this.autoCommitInterval = autoCommitInterval;
274                }
275
276                public String getAutoOffsetReset() {
277                        return this.autoOffsetReset;
278                }
279
280                public void setAutoOffsetReset(String autoOffsetReset) {
281                        this.autoOffsetReset = autoOffsetReset;
282                }
283
284                public List<String> getBootstrapServers() {
285                        return this.bootstrapServers;
286                }
287
288                public void setBootstrapServers(List<String> bootstrapServers) {
289                        this.bootstrapServers = bootstrapServers;
290                }
291
292                public String getClientId() {
293                        return this.clientId;
294                }
295
296                public void setClientId(String clientId) {
297                        this.clientId = clientId;
298                }
299
300                public Boolean getEnableAutoCommit() {
301                        return this.enableAutoCommit;
302                }
303
304                public void setEnableAutoCommit(Boolean enableAutoCommit) {
305                        this.enableAutoCommit = enableAutoCommit;
306                }
307
308                public Integer getFetchMaxWait() {
309                        return this.fetchMaxWait;
310                }
311
312                public void setFetchMaxWait(Integer fetchMaxWait) {
313                        this.fetchMaxWait = fetchMaxWait;
314                }
315
316                public Integer getFetchMinSize() {
317                        return this.fetchMinSize;
318                }
319
320                public void setFetchMinSize(Integer fetchMinSize) {
321                        this.fetchMinSize = fetchMinSize;
322                }
323
324                public String getGroupId() {
325                        return this.groupId;
326                }
327
328                public void setGroupId(String groupId) {
329                        this.groupId = groupId;
330                }
331
332                public Integer getHeartbeatInterval() {
333                        return this.heartbeatInterval;
334                }
335
336                public void setHeartbeatInterval(Integer heartbeatInterval) {
337                        this.heartbeatInterval = heartbeatInterval;
338                }
339
340                public Class<?> getKeyDeserializer() {
341                        return this.keyDeserializer;
342                }
343
344                public void setKeyDeserializer(Class<?> keyDeserializer) {
345                        this.keyDeserializer = keyDeserializer;
346                }
347
348                public Class<?> getValueDeserializer() {
349                        return this.valueDeserializer;
350                }
351
352                public void setValueDeserializer(Class<?> valueDeserializer) {
353                        this.valueDeserializer = valueDeserializer;
354                }
355
356                public Integer getMaxPollRecords() {
357                        return this.maxPollRecords;
358                }
359
360                public void setMaxPollRecords(Integer maxPollRecords) {
361                        this.maxPollRecords = maxPollRecords;
362                }
363
364                public Map<String, Object> buildProperties() {
365                        Map<String, Object> properties = new HashMap<String, Object>();
366                        if (this.autoCommitInterval != null) {
367                                properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
368                                                this.autoCommitInterval);
369                        }
370                        if (this.autoOffsetReset != null) {
371                                properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
372                                                this.autoOffsetReset);
373                        }
374                        if (this.bootstrapServers != null) {
375                                properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
376                                                this.bootstrapServers);
377                        }
378                        if (this.clientId != null) {
379                                properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId);
380                        }
381                        if (this.enableAutoCommit != null) {
382                                properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
383                                                this.enableAutoCommit);
384                        }
385                        if (this.fetchMaxWait != null) {
386                                properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
387                                                this.fetchMaxWait);
388                        }
389                        if (this.fetchMinSize != null) {
390                                properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize);
391                        }
392                        if (this.groupId != null) {
393                                properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId);
394                        }
395                        if (this.heartbeatInterval != null) {
396                                properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
397                                                this.heartbeatInterval);
398                        }
399                        if (this.keyDeserializer != null) {
400                                properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
401                                                this.keyDeserializer);
402                        }
403                        if (this.ssl.getKeyPassword() != null) {
404                                properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
405                                                this.ssl.getKeyPassword());
406                        }
407                        if (this.ssl.getKeystoreLocation() != null) {
408                                properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
409                                                resourceToPath(this.ssl.getKeystoreLocation()));
410                        }
411                        if (this.ssl.getKeystorePassword() != null) {
412                                properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
413                                                this.ssl.getKeystorePassword());
414                        }
415                        if (this.ssl.getTruststoreLocation() != null) {
416                                properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
417                                                resourceToPath(this.ssl.getTruststoreLocation()));
418                        }
419                        if (this.ssl.getTruststorePassword() != null) {
420                                properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
421                                                this.ssl.getTruststorePassword());
422                        }
423                        if (this.valueDeserializer != null) {
424                                properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
425                                                this.valueDeserializer);
426                        }
427                        if (this.maxPollRecords != null) {
428                                properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
429                                                this.maxPollRecords);
430                        }
431                        return properties;
432                }
433
434        }
435
436        public static class Producer {
437
438                private final Ssl ssl = new Ssl();
439
440                /**
441                 * Number of acknowledgments the producer requires the leader to have received
442                 * before considering a request complete.
443                 */
444                private String acks;
445
446                /**
447                 * Number of records to batch before sending.
448                 */
449                private Integer batchSize;
450
451                /**
452                 * Comma-delimited list of host:port pairs to use for establishing the initial
453                 * connection to the Kafka cluster.
454                 */
455                private List<String> bootstrapServers;
456
457                /**
458                 * Total bytes of memory the producer can use to buffer records waiting to be sent
459                 * to the server.
460                 */
461                private Long bufferMemory;
462
463                /**
464                 * Id to pass to the server when making requests; used for server-side logging.
465                 */
466                private String clientId;
467
468                /**
469                 * Compression type for all data generated by the producer.
470                 */
471                private String compressionType;
472
473                /**
474                 * Serializer class for keys.
475                 */
476                private Class<?> keySerializer = StringSerializer.class;
477
478                /**
479                 * Serializer class for values.
480                 */
481                private Class<?> valueSerializer = StringSerializer.class;
482
483                /**
484                 * When greater than zero, enables retrying of failed sends.
485                 */
486                private Integer retries;
487
488                public Ssl getSsl() {
489                        return this.ssl;
490                }
491
492                public String getAcks() {
493                        return this.acks;
494                }
495
496                public void setAcks(String acks) {
497                        this.acks = acks;
498                }
499
500                public Integer getBatchSize() {
501                        return this.batchSize;
502                }
503
504                public void setBatchSize(Integer batchSize) {
505                        this.batchSize = batchSize;
506                }
507
508                public List<String> getBootstrapServers() {
509                        return this.bootstrapServers;
510                }
511
512                public void setBootstrapServers(List<String> bootstrapServers) {
513                        this.bootstrapServers = bootstrapServers;
514                }
515
516                public Long getBufferMemory() {
517                        return this.bufferMemory;
518                }
519
520                public void setBufferMemory(Long bufferMemory) {
521                        this.bufferMemory = bufferMemory;
522                }
523
524                public String getClientId() {
525                        return this.clientId;
526                }
527
528                public void setClientId(String clientId) {
529                        this.clientId = clientId;
530                }
531
532                public String getCompressionType() {
533                        return this.compressionType;
534                }
535
536                public void setCompressionType(String compressionType) {
537                        this.compressionType = compressionType;
538                }
539
540                public Class<?> getKeySerializer() {
541                        return this.keySerializer;
542                }
543
544                public void setKeySerializer(Class<?> keySerializer) {
545                        this.keySerializer = keySerializer;
546                }
547
548                public Class<?> getValueSerializer() {
549                        return this.valueSerializer;
550                }
551
552                public void setValueSerializer(Class<?> valueSerializer) {
553                        this.valueSerializer = valueSerializer;
554                }
555
556                public Integer getRetries() {
557                        return this.retries;
558                }
559
560                public void setRetries(Integer retries) {
561                        this.retries = retries;
562                }
563
564                public Map<String, Object> buildProperties() {
565                        Map<String, Object> properties = new HashMap<String, Object>();
566                        if (this.acks != null) {
567                                properties.put(ProducerConfig.ACKS_CONFIG, this.acks);
568                        }
569                        if (this.batchSize != null) {
570                                properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize);
571                        }
572                        if (this.bootstrapServers != null) {
573                                properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
574                                                this.bootstrapServers);
575                        }
576                        if (this.bufferMemory != null) {
577                                properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory);
578                        }
579                        if (this.clientId != null) {
580                                properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId);
581                        }
582                        if (this.compressionType != null) {
583                                properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
584                                                this.compressionType);
585                        }
586                        if (this.keySerializer != null) {
587                                properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
588                                                this.keySerializer);
589                        }
590                        if (this.retries != null) {
591                                properties.put(ProducerConfig.RETRIES_CONFIG, this.retries);
592                        }
593                        if (this.ssl.getKeyPassword() != null) {
594                                properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,
595                                                this.ssl.getKeyPassword());
596                        }
597                        if (this.ssl.getKeystoreLocation() != null) {
598                                properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
599                                                resourceToPath(this.ssl.getKeystoreLocation()));
600                        }
601                        if (this.ssl.getKeystorePassword() != null) {
602                                properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
603                                                this.ssl.getKeystorePassword());
604                        }
605                        if (this.ssl.getTruststoreLocation() != null) {
606                                properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
607                                                resourceToPath(this.ssl.getTruststoreLocation()));
608                        }
609                        if (this.ssl.getTruststorePassword() != null) {
610                                properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
611                                                this.ssl.getTruststorePassword());
612                        }
613                        if (this.valueSerializer != null) {
614                                properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
615                                                this.valueSerializer);
616                        }
617                        return properties;
618                }
619
620        }
621
622        public static class Template {
623
624                /**
625                 * Default topic to which messages will be sent.
626                 */
627                private String defaultTopic;
628
629                public String getDefaultTopic() {
630                        return this.defaultTopic;
631                }
632
633                public void setDefaultTopic(String defaultTopic) {
634                        this.defaultTopic = defaultTopic;
635                }
636
637        }
638
639        public static class Listener {
640
641                /**
642                 * Listener AckMode; see the spring-kafka documentation.
643                 */
644                private AckMode ackMode;
645
646                /**
647                 * Number of threads to run in the listener containers.
648                 */
649                private Integer concurrency;
650
651                /**
652                 * Timeout in milliseconds to use when polling the consumer.
653                 */
654                private Long pollTimeout;
655
656                /**
657                 * Number of records between offset commits when ackMode is "COUNT" or
658                 * "COUNT_TIME".
659                 */
660                private Integer ackCount;
661
662                /**
663                 * Time in milliseconds between offset commits when ackMode is "TIME" or
664                 * "COUNT_TIME".
665                 */
666                private Long ackTime;
667
668                public AckMode getAckMode() {
669                        return this.ackMode;
670                }
671
672                public void setAckMode(AckMode ackMode) {
673                        this.ackMode = ackMode;
674                }
675
676                public Integer getConcurrency() {
677                        return this.concurrency;
678                }
679
680                public void setConcurrency(Integer concurrency) {
681                        this.concurrency = concurrency;
682                }
683
684                public Long getPollTimeout() {
685                        return this.pollTimeout;
686                }
687
688                public void setPollTimeout(Long pollTimeout) {
689                        this.pollTimeout = pollTimeout;
690                }
691
692                public Integer getAckCount() {
693                        return this.ackCount;
694                }
695
696                public void setAckCount(Integer ackCount) {
697                        this.ackCount = ackCount;
698                }
699
700                public Long getAckTime() {
701                        return this.ackTime;
702                }
703
704                public void setAckTime(Long ackTime) {
705                        this.ackTime = ackTime;
706                }
707
708        }
709
710        public static class Ssl {
711
712                /**
713                 * Password of the private key in the key store file.
714                 */
715                private String keyPassword;
716
717                /**
718                 * Location of the key store file.
719                 */
720                private Resource keystoreLocation;
721
722                /**
723                 * Store password for the key store file.
724                 */
725                private String keystorePassword;
726
727                /**
728                 * Location of the trust store file.
729                 */
730                private Resource truststoreLocation;
731
732                /**
733                 * Store password for the trust store file.
734                 */
735                private String truststorePassword;
736
737                public String getKeyPassword() {
738                        return this.keyPassword;
739                }
740
741                public void setKeyPassword(String keyPassword) {
742                        this.keyPassword = keyPassword;
743                }
744
745                public Resource getKeystoreLocation() {
746                        return this.keystoreLocation;
747                }
748
749                public void setKeystoreLocation(Resource keystoreLocation) {
750                        this.keystoreLocation = keystoreLocation;
751                }
752
753                public String getKeystorePassword() {
754                        return this.keystorePassword;
755                }
756
757                public void setKeystorePassword(String keystorePassword) {
758                        this.keystorePassword = keystorePassword;
759                }
760
761                public Resource getTruststoreLocation() {
762                        return this.truststoreLocation;
763                }
764
765                public void setTruststoreLocation(Resource truststoreLocation) {
766                        this.truststoreLocation = truststoreLocation;
767                }
768
769                public String getTruststorePassword() {
770                        return this.truststorePassword;
771                }
772
773                public void setTruststorePassword(String truststorePassword) {
774                        this.truststorePassword = truststorePassword;
775                }
776
777        }
778
779}