001/* 002 * Copyright 2012-2016 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.kafka; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025 026import org.apache.kafka.clients.CommonClientConfigs; 027import org.apache.kafka.clients.consumer.ConsumerConfig; 028import org.apache.kafka.clients.producer.ProducerConfig; 029import org.apache.kafka.common.config.SslConfigs; 030import org.apache.kafka.common.serialization.StringDeserializer; 031import org.apache.kafka.common.serialization.StringSerializer; 032 033import org.springframework.boot.context.properties.ConfigurationProperties; 034import org.springframework.core.io.Resource; 035import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; 036import org.springframework.util.CollectionUtils; 037 038/** 039 * Configuration properties for Spring for Apache Kafka. 040 * <p> 041 * Users should refer to Kafka documentation for complete descriptions of these 042 * properties. 043 * 044 * @author Gary Russell 045 * @author Stephane Nicoll 046 * @author Artem Bilan 047 * @since 1.5.0 048 */ 049@ConfigurationProperties(prefix = "spring.kafka") 050public class KafkaProperties { 051 052 /** 053 * Comma-delimited list of host:port pairs to use for establishing the initial 054 * connection to the Kafka cluster. 055 */ 056 private List<String> bootstrapServers = new ArrayList<String>( 057 Collections.singletonList("localhost:9092")); 058 059 /** 060 * Id to pass to the server when making requests; used for server-side logging. 061 */ 062 private String clientId; 063 064 /** 065 * Additional properties used to configure the client. 066 */ 067 private Map<String, String> properties = new HashMap<String, String>(); 068 069 private final Consumer consumer = new Consumer(); 070 071 private final Producer producer = new Producer(); 072 073 private final Listener listener = new Listener(); 074 075 private final Ssl ssl = new Ssl(); 076 077 private final Template template = new Template(); 078 079 public List<String> getBootstrapServers() { 080 return this.bootstrapServers; 081 } 082 083 public void setBootstrapServers(List<String> bootstrapServers) { 084 this.bootstrapServers = bootstrapServers; 085 } 086 087 public String getClientId() { 088 return this.clientId; 089 } 090 091 public void setClientId(String clientId) { 092 this.clientId = clientId; 093 } 094 095 public Map<String, String> getProperties() { 096 return this.properties; 097 } 098 099 public void setProperties(Map<String, String> properties) { 100 this.properties = properties; 101 } 102 103 public Consumer getConsumer() { 104 return this.consumer; 105 } 106 107 public Producer getProducer() { 108 return this.producer; 109 } 110 111 public Listener getListener() { 112 return this.listener; 113 } 114 115 public Ssl getSsl() { 116 return this.ssl; 117 } 118 119 public Template getTemplate() { 120 return this.template; 121 } 122 123 private Map<String, Object> buildCommonProperties() { 124 Map<String, Object> properties = new HashMap<String, Object>(); 125 if (this.bootstrapServers != null) { 126 properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 127 this.bootstrapServers); 128 } 129 if (this.clientId != null) { 130 properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); 131 } 132 if (this.ssl.getKeyPassword() != null) { 133 properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.ssl.getKeyPassword()); 134 } 135 if (this.ssl.getKeystoreLocation() != null) { 136 properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 137 resourceToPath(this.ssl.getKeystoreLocation())); 138 } 139 if (this.ssl.getKeystorePassword() != null) { 140 properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 141 this.ssl.getKeystorePassword()); 142 } 143 if (this.ssl.getTruststoreLocation() != null) { 144 properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 145 resourceToPath(this.ssl.getTruststoreLocation())); 146 } 147 if (this.ssl.getTruststorePassword() != null) { 148 properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 149 this.ssl.getTruststorePassword()); 150 } 151 if (!CollectionUtils.isEmpty(this.properties)) { 152 properties.putAll(this.properties); 153 } 154 return properties; 155 } 156 157 /** 158 * Create an initial map of consumer properties from the state of this instance. 159 * <p> 160 * This allows you to add additional properties, if necessary, and override the 161 * default kafkaConsumerFactory bean. 162 * @return the consumer properties initialized with the customizations defined on this 163 * instance 164 */ 165 public Map<String, Object> buildConsumerProperties() { 166 Map<String, Object> properties = buildCommonProperties(); 167 properties.putAll(this.consumer.buildProperties()); 168 return properties; 169 } 170 171 /** 172 * Create an initial map of producer properties from the state of this instance. 173 * <p> 174 * This allows you to add additional properties, if necessary, and override the 175 * default kafkaProducerFactory bean. 176 * @return the producer properties initialized with the customizations defined on this 177 * instance 178 */ 179 public Map<String, Object> buildProducerProperties() { 180 Map<String, Object> properties = buildCommonProperties(); 181 properties.putAll(this.producer.buildProperties()); 182 return properties; 183 } 184 185 private static String resourceToPath(Resource resource) { 186 try { 187 return resource.getFile().getAbsolutePath(); 188 } 189 catch (IOException ex) { 190 throw new IllegalStateException( 191 "Resource '" + resource + "' must be on a file system", ex); 192 } 193 } 194 195 public static class Consumer { 196 197 private final Ssl ssl = new Ssl(); 198 199 /** 200 * Frequency in milliseconds that the consumer offsets are auto-committed to Kafka 201 * if 'enable.auto.commit' true. 202 */ 203 private Integer autoCommitInterval; 204 205 /** 206 * What to do when there is no initial offset in Kafka or if the current offset 207 * does not exist any more on the server. 208 */ 209 private String autoOffsetReset; 210 211 /** 212 * Comma-delimited list of host:port pairs to use for establishing the initial 213 * connection to the Kafka cluster. 214 */ 215 private List<String> bootstrapServers; 216 217 /** 218 * Id to pass to the server when making requests; used for server-side logging. 219 */ 220 private String clientId; 221 222 /** 223 * If true the consumer's offset will be periodically committed in the background. 224 */ 225 private Boolean enableAutoCommit; 226 227 /** 228 * Maximum amount of time in milliseconds the server will block before answering 229 * the fetch request if there isn't sufficient data to immediately satisfy the 230 * requirement given by "fetch.min.bytes". 231 */ 232 private Integer fetchMaxWait; 233 234 /** 235 * Minimum amount of data the server should return for a fetch request in bytes. 236 */ 237 private Integer fetchMinSize; 238 239 /** 240 * Unique string that identifies the consumer group this consumer belongs to. 241 */ 242 private String groupId; 243 244 /** 245 * Expected time in milliseconds between heartbeats to the consumer coordinator. 246 */ 247 private Integer heartbeatInterval; 248 249 /** 250 * Deserializer class for keys. 251 */ 252 private Class<?> keyDeserializer = StringDeserializer.class; 253 254 /** 255 * Deserializer class for values. 256 */ 257 private Class<?> valueDeserializer = StringDeserializer.class; 258 259 /** 260 * Maximum number of records returned in a single call to poll(). 261 */ 262 private Integer maxPollRecords; 263 264 public Ssl getSsl() { 265 return this.ssl; 266 } 267 268 public Integer getAutoCommitInterval() { 269 return this.autoCommitInterval; 270 } 271 272 public void setAutoCommitInterval(Integer autoCommitInterval) { 273 this.autoCommitInterval = autoCommitInterval; 274 } 275 276 public String getAutoOffsetReset() { 277 return this.autoOffsetReset; 278 } 279 280 public void setAutoOffsetReset(String autoOffsetReset) { 281 this.autoOffsetReset = autoOffsetReset; 282 } 283 284 public List<String> getBootstrapServers() { 285 return this.bootstrapServers; 286 } 287 288 public void setBootstrapServers(List<String> bootstrapServers) { 289 this.bootstrapServers = bootstrapServers; 290 } 291 292 public String getClientId() { 293 return this.clientId; 294 } 295 296 public void setClientId(String clientId) { 297 this.clientId = clientId; 298 } 299 300 public Boolean getEnableAutoCommit() { 301 return this.enableAutoCommit; 302 } 303 304 public void setEnableAutoCommit(Boolean enableAutoCommit) { 305 this.enableAutoCommit = enableAutoCommit; 306 } 307 308 public Integer getFetchMaxWait() { 309 return this.fetchMaxWait; 310 } 311 312 public void setFetchMaxWait(Integer fetchMaxWait) { 313 this.fetchMaxWait = fetchMaxWait; 314 } 315 316 public Integer getFetchMinSize() { 317 return this.fetchMinSize; 318 } 319 320 public void setFetchMinSize(Integer fetchMinSize) { 321 this.fetchMinSize = fetchMinSize; 322 } 323 324 public String getGroupId() { 325 return this.groupId; 326 } 327 328 public void setGroupId(String groupId) { 329 this.groupId = groupId; 330 } 331 332 public Integer getHeartbeatInterval() { 333 return this.heartbeatInterval; 334 } 335 336 public void setHeartbeatInterval(Integer heartbeatInterval) { 337 this.heartbeatInterval = heartbeatInterval; 338 } 339 340 public Class<?> getKeyDeserializer() { 341 return this.keyDeserializer; 342 } 343 344 public void setKeyDeserializer(Class<?> keyDeserializer) { 345 this.keyDeserializer = keyDeserializer; 346 } 347 348 public Class<?> getValueDeserializer() { 349 return this.valueDeserializer; 350 } 351 352 public void setValueDeserializer(Class<?> valueDeserializer) { 353 this.valueDeserializer = valueDeserializer; 354 } 355 356 public Integer getMaxPollRecords() { 357 return this.maxPollRecords; 358 } 359 360 public void setMaxPollRecords(Integer maxPollRecords) { 361 this.maxPollRecords = maxPollRecords; 362 } 363 364 public Map<String, Object> buildProperties() { 365 Map<String, Object> properties = new HashMap<String, Object>(); 366 if (this.autoCommitInterval != null) { 367 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 368 this.autoCommitInterval); 369 } 370 if (this.autoOffsetReset != null) { 371 properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 372 this.autoOffsetReset); 373 } 374 if (this.bootstrapServers != null) { 375 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 376 this.bootstrapServers); 377 } 378 if (this.clientId != null) { 379 properties.put(ConsumerConfig.CLIENT_ID_CONFIG, this.clientId); 380 } 381 if (this.enableAutoCommit != null) { 382 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 383 this.enableAutoCommit); 384 } 385 if (this.fetchMaxWait != null) { 386 properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 387 this.fetchMaxWait); 388 } 389 if (this.fetchMinSize != null) { 390 properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, this.fetchMinSize); 391 } 392 if (this.groupId != null) { 393 properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupId); 394 } 395 if (this.heartbeatInterval != null) { 396 properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 397 this.heartbeatInterval); 398 } 399 if (this.keyDeserializer != null) { 400 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 401 this.keyDeserializer); 402 } 403 if (this.ssl.getKeyPassword() != null) { 404 properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 405 this.ssl.getKeyPassword()); 406 } 407 if (this.ssl.getKeystoreLocation() != null) { 408 properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 409 resourceToPath(this.ssl.getKeystoreLocation())); 410 } 411 if (this.ssl.getKeystorePassword() != null) { 412 properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 413 this.ssl.getKeystorePassword()); 414 } 415 if (this.ssl.getTruststoreLocation() != null) { 416 properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 417 resourceToPath(this.ssl.getTruststoreLocation())); 418 } 419 if (this.ssl.getTruststorePassword() != null) { 420 properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 421 this.ssl.getTruststorePassword()); 422 } 423 if (this.valueDeserializer != null) { 424 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 425 this.valueDeserializer); 426 } 427 if (this.maxPollRecords != null) { 428 properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 429 this.maxPollRecords); 430 } 431 return properties; 432 } 433 434 } 435 436 public static class Producer { 437 438 private final Ssl ssl = new Ssl(); 439 440 /** 441 * Number of acknowledgments the producer requires the leader to have received 442 * before considering a request complete. 443 */ 444 private String acks; 445 446 /** 447 * Number of records to batch before sending. 448 */ 449 private Integer batchSize; 450 451 /** 452 * Comma-delimited list of host:port pairs to use for establishing the initial 453 * connection to the Kafka cluster. 454 */ 455 private List<String> bootstrapServers; 456 457 /** 458 * Total bytes of memory the producer can use to buffer records waiting to be sent 459 * to the server. 460 */ 461 private Long bufferMemory; 462 463 /** 464 * Id to pass to the server when making requests; used for server-side logging. 465 */ 466 private String clientId; 467 468 /** 469 * Compression type for all data generated by the producer. 470 */ 471 private String compressionType; 472 473 /** 474 * Serializer class for keys. 475 */ 476 private Class<?> keySerializer = StringSerializer.class; 477 478 /** 479 * Serializer class for values. 480 */ 481 private Class<?> valueSerializer = StringSerializer.class; 482 483 /** 484 * When greater than zero, enables retrying of failed sends. 485 */ 486 private Integer retries; 487 488 public Ssl getSsl() { 489 return this.ssl; 490 } 491 492 public String getAcks() { 493 return this.acks; 494 } 495 496 public void setAcks(String acks) { 497 this.acks = acks; 498 } 499 500 public Integer getBatchSize() { 501 return this.batchSize; 502 } 503 504 public void setBatchSize(Integer batchSize) { 505 this.batchSize = batchSize; 506 } 507 508 public List<String> getBootstrapServers() { 509 return this.bootstrapServers; 510 } 511 512 public void setBootstrapServers(List<String> bootstrapServers) { 513 this.bootstrapServers = bootstrapServers; 514 } 515 516 public Long getBufferMemory() { 517 return this.bufferMemory; 518 } 519 520 public void setBufferMemory(Long bufferMemory) { 521 this.bufferMemory = bufferMemory; 522 } 523 524 public String getClientId() { 525 return this.clientId; 526 } 527 528 public void setClientId(String clientId) { 529 this.clientId = clientId; 530 } 531 532 public String getCompressionType() { 533 return this.compressionType; 534 } 535 536 public void setCompressionType(String compressionType) { 537 this.compressionType = compressionType; 538 } 539 540 public Class<?> getKeySerializer() { 541 return this.keySerializer; 542 } 543 544 public void setKeySerializer(Class<?> keySerializer) { 545 this.keySerializer = keySerializer; 546 } 547 548 public Class<?> getValueSerializer() { 549 return this.valueSerializer; 550 } 551 552 public void setValueSerializer(Class<?> valueSerializer) { 553 this.valueSerializer = valueSerializer; 554 } 555 556 public Integer getRetries() { 557 return this.retries; 558 } 559 560 public void setRetries(Integer retries) { 561 this.retries = retries; 562 } 563 564 public Map<String, Object> buildProperties() { 565 Map<String, Object> properties = new HashMap<String, Object>(); 566 if (this.acks != null) { 567 properties.put(ProducerConfig.ACKS_CONFIG, this.acks); 568 } 569 if (this.batchSize != null) { 570 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, this.batchSize); 571 } 572 if (this.bootstrapServers != null) { 573 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 574 this.bootstrapServers); 575 } 576 if (this.bufferMemory != null) { 577 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.bufferMemory); 578 } 579 if (this.clientId != null) { 580 properties.put(ProducerConfig.CLIENT_ID_CONFIG, this.clientId); 581 } 582 if (this.compressionType != null) { 583 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 584 this.compressionType); 585 } 586 if (this.keySerializer != null) { 587 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 588 this.keySerializer); 589 } 590 if (this.retries != null) { 591 properties.put(ProducerConfig.RETRIES_CONFIG, this.retries); 592 } 593 if (this.ssl.getKeyPassword() != null) { 594 properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 595 this.ssl.getKeyPassword()); 596 } 597 if (this.ssl.getKeystoreLocation() != null) { 598 properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, 599 resourceToPath(this.ssl.getKeystoreLocation())); 600 } 601 if (this.ssl.getKeystorePassword() != null) { 602 properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 603 this.ssl.getKeystorePassword()); 604 } 605 if (this.ssl.getTruststoreLocation() != null) { 606 properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 607 resourceToPath(this.ssl.getTruststoreLocation())); 608 } 609 if (this.ssl.getTruststorePassword() != null) { 610 properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 611 this.ssl.getTruststorePassword()); 612 } 613 if (this.valueSerializer != null) { 614 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 615 this.valueSerializer); 616 } 617 return properties; 618 } 619 620 } 621 622 public static class Template { 623 624 /** 625 * Default topic to which messages will be sent. 626 */ 627 private String defaultTopic; 628 629 public String getDefaultTopic() { 630 return this.defaultTopic; 631 } 632 633 public void setDefaultTopic(String defaultTopic) { 634 this.defaultTopic = defaultTopic; 635 } 636 637 } 638 639 public static class Listener { 640 641 /** 642 * Listener AckMode; see the spring-kafka documentation. 643 */ 644 private AckMode ackMode; 645 646 /** 647 * Number of threads to run in the listener containers. 648 */ 649 private Integer concurrency; 650 651 /** 652 * Timeout in milliseconds to use when polling the consumer. 653 */ 654 private Long pollTimeout; 655 656 /** 657 * Number of records between offset commits when ackMode is "COUNT" or 658 * "COUNT_TIME". 659 */ 660 private Integer ackCount; 661 662 /** 663 * Time in milliseconds between offset commits when ackMode is "TIME" or 664 * "COUNT_TIME". 665 */ 666 private Long ackTime; 667 668 public AckMode getAckMode() { 669 return this.ackMode; 670 } 671 672 public void setAckMode(AckMode ackMode) { 673 this.ackMode = ackMode; 674 } 675 676 public Integer getConcurrency() { 677 return this.concurrency; 678 } 679 680 public void setConcurrency(Integer concurrency) { 681 this.concurrency = concurrency; 682 } 683 684 public Long getPollTimeout() { 685 return this.pollTimeout; 686 } 687 688 public void setPollTimeout(Long pollTimeout) { 689 this.pollTimeout = pollTimeout; 690 } 691 692 public Integer getAckCount() { 693 return this.ackCount; 694 } 695 696 public void setAckCount(Integer ackCount) { 697 this.ackCount = ackCount; 698 } 699 700 public Long getAckTime() { 701 return this.ackTime; 702 } 703 704 public void setAckTime(Long ackTime) { 705 this.ackTime = ackTime; 706 } 707 708 } 709 710 public static class Ssl { 711 712 /** 713 * Password of the private key in the key store file. 714 */ 715 private String keyPassword; 716 717 /** 718 * Location of the key store file. 719 */ 720 private Resource keystoreLocation; 721 722 /** 723 * Store password for the key store file. 724 */ 725 private String keystorePassword; 726 727 /** 728 * Location of the trust store file. 729 */ 730 private Resource truststoreLocation; 731 732 /** 733 * Store password for the trust store file. 734 */ 735 private String truststorePassword; 736 737 public String getKeyPassword() { 738 return this.keyPassword; 739 } 740 741 public void setKeyPassword(String keyPassword) { 742 this.keyPassword = keyPassword; 743 } 744 745 public Resource getKeystoreLocation() { 746 return this.keystoreLocation; 747 } 748 749 public void setKeystoreLocation(Resource keystoreLocation) { 750 this.keystoreLocation = keystoreLocation; 751 } 752 753 public String getKeystorePassword() { 754 return this.keystorePassword; 755 } 756 757 public void setKeystorePassword(String keystorePassword) { 758 this.keystorePassword = keystorePassword; 759 } 760 761 public Resource getTruststoreLocation() { 762 return this.truststoreLocation; 763 } 764 765 public void setTruststoreLocation(Resource truststoreLocation) { 766 this.truststoreLocation = truststoreLocation; 767 } 768 769 public String getTruststorePassword() { 770 return this.truststorePassword; 771 } 772 773 public void setTruststorePassword(String truststorePassword) { 774 this.truststorePassword = truststorePassword; 775 } 776 777 } 778 779}