001/* 002 * Copyright 2012-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.kafka; 018 019import java.io.IOException; 020import java.time.Duration; 021import java.time.temporal.ChronoUnit; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.List; 026import java.util.Map; 027 028import org.apache.kafka.clients.CommonClientConfigs; 029import org.apache.kafka.clients.consumer.ConsumerConfig; 030import org.apache.kafka.clients.producer.ProducerConfig; 031import org.apache.kafka.common.config.SslConfigs; 032import org.apache.kafka.common.serialization.StringDeserializer; 033import org.apache.kafka.common.serialization.StringSerializer; 034 035import org.springframework.boot.context.properties.ConfigurationProperties; 036import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; 037import org.springframework.boot.context.properties.PropertyMapper; 038import org.springframework.boot.convert.DurationUnit; 039import org.springframework.core.io.Resource; 040import org.springframework.kafka.listener.ContainerProperties.AckMode; 041import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; 042import org.springframework.util.CollectionUtils; 043import org.springframework.util.unit.DataSize; 044 045/** 046 * Configuration properties for Spring for Apache Kafka. 047 * <p> 048 * Users should refer to Kafka documentation for complete descriptions of these 049 * properties. 050 * 051 * @author Gary Russell 052 * @author Stephane Nicoll 053 * @author Artem Bilan 054 * @author Nakul Mishra 055 * @since 1.5.0 056 */ 057@ConfigurationProperties(prefix = "spring.kafka") 058public class KafkaProperties { 059 060 /** 061 * Comma-delimited list of host:port pairs to use for establishing the initial 062 * connections to the Kafka cluster. Applies to all components unless overridden. 063 */ 064 private List<String> bootstrapServers = new ArrayList<>( 065 Collections.singletonList("localhost:9092")); 066 067 /** 068 * ID to pass to the server when making requests. Used for server-side logging. 069 */ 070 private String clientId; 071 072 /** 073 * Additional properties, common to producers and consumers, used to configure the 074 * client. 075 */ 076 private final Map<String, String> properties = new HashMap<>(); 077 078 private final Consumer consumer = new Consumer(); 079 080 private final Producer producer = new Producer(); 081 082 private final Admin admin = new Admin(); 083 084 private final Streams streams = new Streams(); 085 086 private final Listener listener = new Listener(); 087 088 private final Ssl ssl = new Ssl(); 089 090 private final Jaas jaas = new Jaas(); 091 092 private final Template template = new Template(); 093 094 public List<String> getBootstrapServers() { 095 return this.bootstrapServers; 096 } 097 098 public void setBootstrapServers(List<String> bootstrapServers) { 099 this.bootstrapServers = bootstrapServers; 100 } 101 102 public String getClientId() { 103 return this.clientId; 104 } 105 106 public void setClientId(String clientId) { 107 this.clientId = clientId; 108 } 109 110 public Map<String, String> getProperties() { 111 return this.properties; 112 } 113 114 public Consumer getConsumer() { 115 return this.consumer; 116 } 117 118 public Producer getProducer() { 119 return this.producer; 120 } 121 122 public Listener getListener() { 123 return this.listener; 124 } 125 126 public Admin getAdmin() { 127 return this.admin; 128 } 129 130 public Streams getStreams() { 131 return this.streams; 132 } 133 134 public Ssl getSsl() { 135 return this.ssl; 136 } 137 138 public Jaas getJaas() { 139 return this.jaas; 140 } 141 142 public Template getTemplate() { 143 return this.template; 144 } 145 146 private Map<String, Object> buildCommonProperties() { 147 Map<String, Object> properties = new HashMap<>(); 148 if (this.bootstrapServers != null) { 149 properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 150 this.bootstrapServers); 151 } 152 if (this.clientId != null) { 153 properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); 154 } 155 properties.putAll(this.ssl.buildProperties()); 156 if (!CollectionUtils.isEmpty(this.properties)) { 157 properties.putAll(this.properties); 158 } 159 return properties; 160 } 161 162 /** 163 * Create an initial map of consumer properties from the state of this instance. 164 * <p> 165 * This allows you to add additional properties, if necessary, and override the 166 * default kafkaConsumerFactory bean. 167 * @return the consumer properties initialized with the customizations defined on this 168 * instance 169 */ 170 public Map<String, Object> buildConsumerProperties() { 171 Map<String, Object> properties = buildCommonProperties(); 172 properties.putAll(this.consumer.buildProperties()); 173 return properties; 174 } 175 176 /** 177 * Create an initial map of producer properties from the state of this instance. 178 * <p> 179 * This allows you to add additional properties, if necessary, and override the 180 * default kafkaProducerFactory bean. 181 * @return the producer properties initialized with the customizations defined on this 182 * instance 183 */ 184 public Map<String, Object> buildProducerProperties() { 185 Map<String, Object> properties = buildCommonProperties(); 186 properties.putAll(this.producer.buildProperties()); 187 return properties; 188 } 189 190 /** 191 * Create an initial map of admin properties from the state of this instance. 192 * <p> 193 * This allows you to add additional properties, if necessary, and override the 194 * default kafkaAdmin bean. 195 * @return the admin properties initialized with the customizations defined on this 196 * instance 197 */ 198 public Map<String, Object> buildAdminProperties() { 199 Map<String, Object> properties = buildCommonProperties(); 200 properties.putAll(this.admin.buildProperties()); 201 return properties; 202 } 203 204 /** 205 * Create an initial map of streams properties from the state of this instance. 206 * <p> 207 * This allows you to add additional properties, if necessary. 208 * @return the streams properties initialized with the customizations defined on this 209 * instance 210 */ 211 public Map<String, Object> buildStreamsProperties() { 212 Map<String, Object> properties = buildCommonProperties(); 213 properties.putAll(this.streams.buildProperties()); 214 return properties; 215 } 216 217 public static class Consumer { 218 219 private final Ssl ssl = new Ssl(); 220 221 /** 222 * Frequency with which the consumer offsets are auto-committed to Kafka if 223 * 'enable.auto.commit' is set to true. 224 */ 225 private Duration autoCommitInterval; 226 227 /** 228 * What to do when there is no initial offset in Kafka or if the current offset no 229 * longer exists on the server. 230 */ 231 private String autoOffsetReset; 232 233 /** 234 * Comma-delimited list of host:port pairs to use for establishing the initial 235 * connections to the Kafka cluster. Overrides the global property, for consumers. 236 */ 237 private List<String> bootstrapServers; 238 239 /** 240 * ID to pass to the server when making requests. Used for server-side logging. 241 */ 242 private String clientId; 243 244 /** 245 * Whether the consumer's offset is periodically committed in the background. 246 */ 247 private Boolean enableAutoCommit; 248 249 /** 250 * Maximum amount of time the server blocks before answering the fetch request if 251 * there isn't sufficient data to immediately satisfy the requirement given by 252 * "fetch-min-size". 253 */ 254 private Duration fetchMaxWait; 255 256 /** 257 * Minimum amount of data the server should return for a fetch request. 258 */ 259 private DataSize fetchMinSize; 260 261 /** 262 * Unique string that identifies the consumer group to which this consumer 263 * belongs. 264 */ 265 private String groupId; 266 267 /** 268 * Expected time between heartbeats to the consumer coordinator. 269 */ 270 private Duration heartbeatInterval; 271 272 /** 273 * Deserializer class for keys. 274 */ 275 private Class<?> keyDeserializer = StringDeserializer.class; 276 277 /** 278 * Deserializer class for values. 279 */ 280 private Class<?> valueDeserializer = StringDeserializer.class; 281 282 /** 283 * Maximum number of records returned in a single call to poll(). 284 */ 285 private Integer maxPollRecords; 286 287 /** 288 * Additional consumer-specific properties used to configure the client. 289 */ 290 private final Map<String, String> properties = new HashMap<>(); 291 292 public Ssl getSsl() { 293 return this.ssl; 294 } 295 296 public Duration getAutoCommitInterval() { 297 return this.autoCommitInterval; 298 } 299 300 public void setAutoCommitInterval(Duration autoCommitInterval) { 301 this.autoCommitInterval = autoCommitInterval; 302 } 303 304 public String getAutoOffsetReset() { 305 return this.autoOffsetReset; 306 } 307 308 public void setAutoOffsetReset(String autoOffsetReset) { 309 this.autoOffsetReset = autoOffsetReset; 310 } 311 312 public List<String> getBootstrapServers() { 313 return this.bootstrapServers; 314 } 315 316 public void setBootstrapServers(List<String> bootstrapServers) { 317 this.bootstrapServers = bootstrapServers; 318 } 319 320 public String getClientId() { 321 return this.clientId; 322 } 323 324 public void setClientId(String clientId) { 325 this.clientId = clientId; 326 } 327 328 public Boolean getEnableAutoCommit() { 329 return this.enableAutoCommit; 330 } 331 332 public void setEnableAutoCommit(Boolean enableAutoCommit) { 333 this.enableAutoCommit = enableAutoCommit; 334 } 335 336 public Duration getFetchMaxWait() { 337 return this.fetchMaxWait; 338 } 339 340 public void setFetchMaxWait(Duration fetchMaxWait) { 341 this.fetchMaxWait = fetchMaxWait; 342 } 343 344 public DataSize getFetchMinSize() { 345 return this.fetchMinSize; 346 } 347 348 public void setFetchMinSize(DataSize fetchMinSize) { 349 this.fetchMinSize = fetchMinSize; 350 } 351 352 public String getGroupId() { 353 return this.groupId; 354 } 355 356 public void setGroupId(String groupId) { 357 this.groupId = groupId; 358 } 359 360 public Duration getHeartbeatInterval() { 361 return this.heartbeatInterval; 362 } 363 364 public void setHeartbeatInterval(Duration heartbeatInterval) { 365 this.heartbeatInterval = heartbeatInterval; 366 } 367 368 public Class<?> getKeyDeserializer() { 369 return this.keyDeserializer; 370 } 371 372 public void setKeyDeserializer(Class<?> keyDeserializer) { 373 this.keyDeserializer = keyDeserializer; 374 } 375 376 public Class<?> getValueDeserializer() { 377 return this.valueDeserializer; 378 } 379 380 public void setValueDeserializer(Class<?> valueDeserializer) { 381 this.valueDeserializer = valueDeserializer; 382 } 383 384 public Integer getMaxPollRecords() { 385 return this.maxPollRecords; 386 } 387 388 public void setMaxPollRecords(Integer maxPollRecords) { 389 this.maxPollRecords = maxPollRecords; 390 } 391 392 public Map<String, String> getProperties() { 393 return this.properties; 394 } 395 396 public Map<String, Object> buildProperties() { 397 Properties properties = new Properties(); 398 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 399 map.from(this::getAutoCommitInterval).asInt(Duration::toMillis) 400 .to(properties.in(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); 401 map.from(this::getAutoOffsetReset) 402 .to(properties.in(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)); 403 map.from(this::getBootstrapServers) 404 .to(properties.in(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); 405 map.from(this::getClientId) 406 .to(properties.in(ConsumerConfig.CLIENT_ID_CONFIG)); 407 map.from(this::getEnableAutoCommit) 408 .to(properties.in(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)); 409 map.from(this::getFetchMaxWait).asInt(Duration::toMillis) 410 .to(properties.in(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG)); 411 map.from(this::getFetchMinSize).asInt(DataSize::toBytes) 412 .to(properties.in(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)); 413 map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); 414 map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) 415 .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); 416 map.from(this::getKeyDeserializer) 417 .to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); 418 map.from(this::getValueDeserializer) 419 .to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); 420 map.from(this::getMaxPollRecords) 421 .to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); 422 return properties.with(this.ssl, this.properties); 423 } 424 425 } 426 427 public static class Producer { 428 429 private final Ssl ssl = new Ssl(); 430 431 /** 432 * Number of acknowledgments the producer requires the leader to have received 433 * before considering a request complete. 434 */ 435 private String acks; 436 437 /** 438 * Default batch size. A small batch size will make batching less common and may 439 * reduce throughput (a batch size of zero disables batching entirely). 440 */ 441 private DataSize batchSize; 442 443 /** 444 * Comma-delimited list of host:port pairs to use for establishing the initial 445 * connections to the Kafka cluster. Overrides the global property, for producers. 446 */ 447 private List<String> bootstrapServers; 448 449 /** 450 * Total memory size the producer can use to buffer records waiting to be sent to 451 * the server. 452 */ 453 private DataSize bufferMemory; 454 455 /** 456 * ID to pass to the server when making requests. Used for server-side logging. 457 */ 458 private String clientId; 459 460 /** 461 * Compression type for all data generated by the producer. 462 */ 463 private String compressionType; 464 465 /** 466 * Serializer class for keys. 467 */ 468 private Class<?> keySerializer = StringSerializer.class; 469 470 /** 471 * Serializer class for values. 472 */ 473 private Class<?> valueSerializer = StringSerializer.class; 474 475 /** 476 * When greater than zero, enables retrying of failed sends. 477 */ 478 private Integer retries; 479 480 /** 481 * When non empty, enables transaction support for producer. 482 */ 483 private String transactionIdPrefix; 484 485 /** 486 * Additional producer-specific properties used to configure the client. 487 */ 488 private final Map<String, String> properties = new HashMap<>(); 489 490 public Ssl getSsl() { 491 return this.ssl; 492 } 493 494 public String getAcks() { 495 return this.acks; 496 } 497 498 public void setAcks(String acks) { 499 this.acks = acks; 500 } 501 502 public DataSize getBatchSize() { 503 return this.batchSize; 504 } 505 506 public void setBatchSize(DataSize batchSize) { 507 this.batchSize = batchSize; 508 } 509 510 public List<String> getBootstrapServers() { 511 return this.bootstrapServers; 512 } 513 514 public void setBootstrapServers(List<String> bootstrapServers) { 515 this.bootstrapServers = bootstrapServers; 516 } 517 518 public DataSize getBufferMemory() { 519 return this.bufferMemory; 520 } 521 522 public void setBufferMemory(DataSize bufferMemory) { 523 this.bufferMemory = bufferMemory; 524 } 525 526 public String getClientId() { 527 return this.clientId; 528 } 529 530 public void setClientId(String clientId) { 531 this.clientId = clientId; 532 } 533 534 public String getCompressionType() { 535 return this.compressionType; 536 } 537 538 public void setCompressionType(String compressionType) { 539 this.compressionType = compressionType; 540 } 541 542 public Class<?> getKeySerializer() { 543 return this.keySerializer; 544 } 545 546 public void setKeySerializer(Class<?> keySerializer) { 547 this.keySerializer = keySerializer; 548 } 549 550 public Class<?> getValueSerializer() { 551 return this.valueSerializer; 552 } 553 554 public void setValueSerializer(Class<?> valueSerializer) { 555 this.valueSerializer = valueSerializer; 556 } 557 558 public Integer getRetries() { 559 return this.retries; 560 } 561 562 public void setRetries(Integer retries) { 563 this.retries = retries; 564 } 565 566 public String getTransactionIdPrefix() { 567 return this.transactionIdPrefix; 568 } 569 570 public void setTransactionIdPrefix(String transactionIdPrefix) { 571 this.transactionIdPrefix = transactionIdPrefix; 572 } 573 574 public Map<String, String> getProperties() { 575 return this.properties; 576 } 577 578 public Map<String, Object> buildProperties() { 579 Properties properties = new Properties(); 580 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 581 map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG)); 582 map.from(this::getBatchSize).asInt(DataSize::toBytes) 583 .to(properties.in(ProducerConfig.BATCH_SIZE_CONFIG)); 584 map.from(this::getBootstrapServers) 585 .to(properties.in(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); 586 map.from(this::getBufferMemory).as(DataSize::toBytes) 587 .to(properties.in(ProducerConfig.BUFFER_MEMORY_CONFIG)); 588 map.from(this::getClientId) 589 .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); 590 map.from(this::getCompressionType) 591 .to(properties.in(ProducerConfig.COMPRESSION_TYPE_CONFIG)); 592 map.from(this::getKeySerializer) 593 .to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); 594 map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG)); 595 map.from(this::getValueSerializer) 596 .to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); 597 return properties.with(this.ssl, this.properties); 598 } 599 600 } 601 602 public static class Admin { 603 604 private final Ssl ssl = new Ssl(); 605 606 /** 607 * ID to pass to the server when making requests. Used for server-side logging. 608 */ 609 private String clientId; 610 611 /** 612 * Additional admin-specific properties used to configure the client. 613 */ 614 private final Map<String, String> properties = new HashMap<>(); 615 616 /** 617 * Whether to fail fast if the broker is not available on startup. 618 */ 619 private boolean failFast; 620 621 public Ssl getSsl() { 622 return this.ssl; 623 } 624 625 public String getClientId() { 626 return this.clientId; 627 } 628 629 public void setClientId(String clientId) { 630 this.clientId = clientId; 631 } 632 633 public boolean isFailFast() { 634 return this.failFast; 635 } 636 637 public void setFailFast(boolean failFast) { 638 this.failFast = failFast; 639 } 640 641 public Map<String, String> getProperties() { 642 return this.properties; 643 } 644 645 public Map<String, Object> buildProperties() { 646 Properties properties = new Properties(); 647 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 648 map.from(this::getClientId) 649 .to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); 650 return properties.with(this.ssl, this.properties); 651 } 652 653 } 654 655 /** 656 * High (and some medium) priority Streams properties and a general properties bucket. 657 */ 658 public static class Streams { 659 660 private final Ssl ssl = new Ssl(); 661 662 /** 663 * Kafka streams application.id property; default spring.application.name. 664 */ 665 private String applicationId; 666 667 /** 668 * Whether or not to auto-start the streams factory bean. 669 */ 670 private boolean autoStartup = true; 671 672 /** 673 * Comma-delimited list of host:port pairs to use for establishing the initial 674 * connections to the Kafka cluster. Overrides the global property, for streams. 675 */ 676 private List<String> bootstrapServers; 677 678 /** 679 * Maximum memory size to be used for buffering across all threads. 680 */ 681 private DataSize cacheMaxSizeBuffering; 682 683 /** 684 * ID to pass to the server when making requests. Used for server-side logging. 685 */ 686 private String clientId; 687 688 /** 689 * The replication factor for change log topics and repartition topics created by 690 * the stream processing application. 691 */ 692 private Integer replicationFactor; 693 694 /** 695 * Directory location for the state store. 696 */ 697 private String stateDir; 698 699 /** 700 * Additional Kafka properties used to configure the streams. 701 */ 702 private final Map<String, String> properties = new HashMap<>(); 703 704 public Ssl getSsl() { 705 return this.ssl; 706 } 707 708 public String getApplicationId() { 709 return this.applicationId; 710 } 711 712 public void setApplicationId(String applicationId) { 713 this.applicationId = applicationId; 714 } 715 716 public boolean isAutoStartup() { 717 return this.autoStartup; 718 } 719 720 public void setAutoStartup(boolean autoStartup) { 721 this.autoStartup = autoStartup; 722 } 723 724 public List<String> getBootstrapServers() { 725 return this.bootstrapServers; 726 } 727 728 public void setBootstrapServers(List<String> bootstrapServers) { 729 this.bootstrapServers = bootstrapServers; 730 } 731 732 @DeprecatedConfigurationProperty(replacement = "spring.kafka.streams.cache-max-size-buffering") 733 @Deprecated 734 public Integer getCacheMaxBytesBuffering() { 735 return (this.cacheMaxSizeBuffering != null) 736 ? (int) this.cacheMaxSizeBuffering.toBytes() : null; 737 } 738 739 @Deprecated 740 public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) { 741 DataSize cacheMaxSizeBuffering = (cacheMaxBytesBuffering != null) 742 ? DataSize.ofBytes(cacheMaxBytesBuffering) : null; 743 setCacheMaxSizeBuffering(cacheMaxSizeBuffering); 744 } 745 746 public DataSize getCacheMaxSizeBuffering() { 747 return this.cacheMaxSizeBuffering; 748 } 749 750 public void setCacheMaxSizeBuffering(DataSize cacheMaxSizeBuffering) { 751 this.cacheMaxSizeBuffering = cacheMaxSizeBuffering; 752 } 753 754 public String getClientId() { 755 return this.clientId; 756 } 757 758 public void setClientId(String clientId) { 759 this.clientId = clientId; 760 } 761 762 public Integer getReplicationFactor() { 763 return this.replicationFactor; 764 } 765 766 public void setReplicationFactor(Integer replicationFactor) { 767 this.replicationFactor = replicationFactor; 768 } 769 770 public String getStateDir() { 771 return this.stateDir; 772 } 773 774 public void setStateDir(String stateDir) { 775 this.stateDir = stateDir; 776 } 777 778 public Map<String, String> getProperties() { 779 return this.properties; 780 } 781 782 public Map<String, Object> buildProperties() { 783 Properties properties = new Properties(); 784 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 785 map.from(this::getApplicationId).to(properties.in("application.id")); 786 map.from(this::getBootstrapServers) 787 .to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); 788 map.from(this::getCacheMaxSizeBuffering).asInt(DataSize::toBytes) 789 .to(properties.in("cache.max.bytes.buffering")); 790 map.from(this::getClientId) 791 .to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG)); 792 map.from(this::getReplicationFactor).to(properties.in("replication.factor")); 793 map.from(this::getStateDir).to(properties.in("state.dir")); 794 return properties.with(this.ssl, this.properties); 795 } 796 797 } 798 799 public static class Template { 800 801 /** 802 * Default topic to which messages are sent. 803 */ 804 private String defaultTopic; 805 806 public String getDefaultTopic() { 807 return this.defaultTopic; 808 } 809 810 public void setDefaultTopic(String defaultTopic) { 811 this.defaultTopic = defaultTopic; 812 } 813 814 } 815 816 public static class Listener { 817 818 public enum Type { 819 820 /** 821 * Invokes the endpoint with one ConsumerRecord at a time. 822 */ 823 SINGLE, 824 825 /** 826 * Invokes the endpoint with a batch of ConsumerRecords. 827 */ 828 BATCH 829 830 } 831 832 /** 833 * Listener type. 834 */ 835 private Type type = Type.SINGLE; 836 837 /** 838 * Listener AckMode. See the spring-kafka documentation. 839 */ 840 private AckMode ackMode; 841 842 /** 843 * Prefix for the listener's consumer client.id property. 844 */ 845 private String clientId; 846 847 /** 848 * Number of threads to run in the listener containers. 849 */ 850 private Integer concurrency; 851 852 /** 853 * Timeout to use when polling the consumer. 854 */ 855 private Duration pollTimeout; 856 857 /** 858 * Multiplier applied to "pollTimeout" to determine if a consumer is 859 * non-responsive. 860 */ 861 private Float noPollThreshold; 862 863 /** 864 * Number of records between offset commits when ackMode is "COUNT" or 865 * "COUNT_TIME". 866 */ 867 private Integer ackCount; 868 869 /** 870 * Time between offset commits when ackMode is "TIME" or "COUNT_TIME". 871 */ 872 private Duration ackTime; 873 874 /** 875 * Time between publishing idle consumer events (no data received). 876 */ 877 private Duration idleEventInterval; 878 879 /** 880 * Time between checks for non-responsive consumers. If a duration suffix is not 881 * specified, seconds will be used. 882 */ 883 @DurationUnit(ChronoUnit.SECONDS) 884 private Duration monitorInterval; 885 886 /** 887 * Whether to log the container configuration during initialization (INFO level). 888 */ 889 private Boolean logContainerConfig; 890 891 public Type getType() { 892 return this.type; 893 } 894 895 public void setType(Type type) { 896 this.type = type; 897 } 898 899 public AckMode getAckMode() { 900 return this.ackMode; 901 } 902 903 public void setAckMode(AckMode ackMode) { 904 this.ackMode = ackMode; 905 } 906 907 public String getClientId() { 908 return this.clientId; 909 } 910 911 public void setClientId(String clientId) { 912 this.clientId = clientId; 913 } 914 915 public Integer getConcurrency() { 916 return this.concurrency; 917 } 918 919 public void setConcurrency(Integer concurrency) { 920 this.concurrency = concurrency; 921 } 922 923 public Duration getPollTimeout() { 924 return this.pollTimeout; 925 } 926 927 public void setPollTimeout(Duration pollTimeout) { 928 this.pollTimeout = pollTimeout; 929 } 930 931 public Float getNoPollThreshold() { 932 return this.noPollThreshold; 933 } 934 935 public void setNoPollThreshold(Float noPollThreshold) { 936 this.noPollThreshold = noPollThreshold; 937 } 938 939 public Integer getAckCount() { 940 return this.ackCount; 941 } 942 943 public void setAckCount(Integer ackCount) { 944 this.ackCount = ackCount; 945 } 946 947 public Duration getAckTime() { 948 return this.ackTime; 949 } 950 951 public void setAckTime(Duration ackTime) { 952 this.ackTime = ackTime; 953 } 954 955 public Duration getIdleEventInterval() { 956 return this.idleEventInterval; 957 } 958 959 public void setIdleEventInterval(Duration idleEventInterval) { 960 this.idleEventInterval = idleEventInterval; 961 } 962 963 public Duration getMonitorInterval() { 964 return this.monitorInterval; 965 } 966 967 public void setMonitorInterval(Duration monitorInterval) { 968 this.monitorInterval = monitorInterval; 969 } 970 971 public Boolean getLogContainerConfig() { 972 return this.logContainerConfig; 973 } 974 975 public void setLogContainerConfig(Boolean logContainerConfig) { 976 this.logContainerConfig = logContainerConfig; 977 } 978 979 } 980 981 public static class Ssl { 982 983 /** 984 * Password of the private key in the key store file. 985 */ 986 private String keyPassword; 987 988 /** 989 * Location of the key store file. 990 */ 991 private Resource keyStoreLocation; 992 993 /** 994 * Store password for the key store file. 995 */ 996 private String keyStorePassword; 997 998 /** 999 * Type of the key store. 1000 */ 1001 private String keyStoreType; 1002 1003 /** 1004 * Location of the trust store file. 1005 */ 1006 private Resource trustStoreLocation; 1007 1008 /** 1009 * Store password for the trust store file. 1010 */ 1011 private String trustStorePassword; 1012 1013 /** 1014 * Type of the trust store. 1015 */ 1016 private String trustStoreType; 1017 1018 /** 1019 * SSL protocol to use. 1020 */ 1021 private String protocol; 1022 1023 public String getKeyPassword() { 1024 return this.keyPassword; 1025 } 1026 1027 public void setKeyPassword(String keyPassword) { 1028 this.keyPassword = keyPassword; 1029 } 1030 1031 public Resource getKeyStoreLocation() { 1032 return this.keyStoreLocation; 1033 } 1034 1035 public void setKeyStoreLocation(Resource keyStoreLocation) { 1036 this.keyStoreLocation = keyStoreLocation; 1037 } 1038 1039 public String getKeyStorePassword() { 1040 return this.keyStorePassword; 1041 } 1042 1043 public void setKeyStorePassword(String keyStorePassword) { 1044 this.keyStorePassword = keyStorePassword; 1045 } 1046 1047 public String getKeyStoreType() { 1048 return this.keyStoreType; 1049 } 1050 1051 public void setKeyStoreType(String keyStoreType) { 1052 this.keyStoreType = keyStoreType; 1053 } 1054 1055 public Resource getTrustStoreLocation() { 1056 return this.trustStoreLocation; 1057 } 1058 1059 public void setTrustStoreLocation(Resource trustStoreLocation) { 1060 this.trustStoreLocation = trustStoreLocation; 1061 } 1062 1063 public String getTrustStorePassword() { 1064 return this.trustStorePassword; 1065 } 1066 1067 public void setTrustStorePassword(String trustStorePassword) { 1068 this.trustStorePassword = trustStorePassword; 1069 } 1070 1071 public String getTrustStoreType() { 1072 return this.trustStoreType; 1073 } 1074 1075 public void setTrustStoreType(String trustStoreType) { 1076 this.trustStoreType = trustStoreType; 1077 } 1078 1079 public String getProtocol() { 1080 return this.protocol; 1081 } 1082 1083 public void setProtocol(String protocol) { 1084 this.protocol = protocol; 1085 } 1086 1087 public Map<String, Object> buildProperties() { 1088 Properties properties = new Properties(); 1089 PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); 1090 map.from(this::getKeyPassword) 1091 .to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); 1092 map.from(this::getKeyStoreLocation).as(this::resourceToPath) 1093 .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); 1094 map.from(this::getKeyStorePassword) 1095 .to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); 1096 map.from(this::getKeyStoreType) 1097 .to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); 1098 map.from(this::getTrustStoreLocation).as(this::resourceToPath) 1099 .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); 1100 map.from(this::getTrustStorePassword) 1101 .to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); 1102 map.from(this::getTrustStoreType) 1103 .to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); 1104 map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG)); 1105 return properties; 1106 } 1107 1108 private String resourceToPath(Resource resource) { 1109 try { 1110 return resource.getFile().getAbsolutePath(); 1111 } 1112 catch (IOException ex) { 1113 throw new IllegalStateException( 1114 "Resource '" + resource + "' must be on a file system", ex); 1115 } 1116 } 1117 1118 } 1119 1120 public static class Jaas { 1121 1122 /** 1123 * Whether to enable JAAS configuration. 1124 */ 1125 private boolean enabled; 1126 1127 /** 1128 * Login module. 1129 */ 1130 private String loginModule = "com.sun.security.auth.module.Krb5LoginModule"; 1131 1132 /** 1133 * Control flag for login configuration. 1134 */ 1135 private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED; 1136 1137 /** 1138 * Additional JAAS options. 1139 */ 1140 private final Map<String, String> options = new HashMap<>(); 1141 1142 public boolean isEnabled() { 1143 return this.enabled; 1144 } 1145 1146 public void setEnabled(boolean enabled) { 1147 this.enabled = enabled; 1148 } 1149 1150 public String getLoginModule() { 1151 return this.loginModule; 1152 } 1153 1154 public void setLoginModule(String loginModule) { 1155 this.loginModule = loginModule; 1156 } 1157 1158 public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() { 1159 return this.controlFlag; 1160 } 1161 1162 public void setControlFlag( 1163 KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) { 1164 this.controlFlag = controlFlag; 1165 } 1166 1167 public Map<String, String> getOptions() { 1168 return this.options; 1169 } 1170 1171 public void setOptions(Map<String, String> options) { 1172 if (options != null) { 1173 this.options.putAll(options); 1174 } 1175 } 1176 1177 } 1178 1179 @SuppressWarnings("serial") 1180 private static class Properties extends HashMap<String, Object> { 1181 1182 public <V> java.util.function.Consumer<V> in(String key) { 1183 return (value) -> put(key, value); 1184 } 1185 1186 public Properties with(Ssl ssl, Map<String, String> properties) { 1187 putAll(ssl.buildProperties()); 1188 putAll(properties); 1189 return this; 1190 } 1191 1192 } 1193 1194}