001/* 002 * Copyright 2012-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.boot.autoconfigure.amqp; 018 019import java.time.Duration; 020import java.time.temporal.ChronoUnit; 021import java.util.ArrayList; 022import java.util.List; 023 024import org.springframework.amqp.core.AcknowledgeMode; 025import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode; 026import org.springframework.boot.context.properties.ConfigurationProperties; 027import org.springframework.boot.context.properties.DeprecatedConfigurationProperty; 028import org.springframework.boot.convert.DurationUnit; 029import org.springframework.util.CollectionUtils; 030import org.springframework.util.StringUtils; 031 032/** 033 * Configuration properties for Rabbit. 034 * 035 * @author Greg Turnquist 036 * @author Dave Syer 037 * @author Stephane Nicoll 038 * @author Andy Wilkinson 039 * @author Josh Thornhill 040 * @author Gary Russell 041 * @author Artsiom Yudovin 042 */ 043@ConfigurationProperties(prefix = "spring.rabbitmq") 044public class RabbitProperties { 045 046 /** 047 * RabbitMQ host. 048 */ 049 private String host = "localhost"; 050 051 /** 052 * RabbitMQ port. 053 */ 054 private int port = 5672; 055 056 /** 057 * Login user to authenticate to the broker. 058 */ 059 private String username = "guest"; 060 061 /** 062 * Login to authenticate against the broker. 063 */ 064 private String password = "guest"; 065 066 /** 067 * SSL configuration. 068 */ 069 private final Ssl ssl = new Ssl(); 070 071 /** 072 * Virtual host to use when connecting to the broker. 073 */ 074 private String virtualHost; 075 076 /** 077 * Comma-separated list of addresses to which the client should connect. 078 */ 079 private String addresses; 080 081 /** 082 * Requested heartbeat timeout; zero for none. If a duration suffix is not specified, 083 * seconds will be used. 084 */ 085 @DurationUnit(ChronoUnit.SECONDS) 086 private Duration requestedHeartbeat; 087 088 /** 089 * Whether to enable publisher confirms. 090 */ 091 private boolean publisherConfirms; 092 093 /** 094 * Whether to enable publisher returns. 095 */ 096 private boolean publisherReturns; 097 098 /** 099 * Connection timeout. Set it to zero to wait forever. 100 */ 101 private Duration connectionTimeout; 102 103 /** 104 * Cache configuration. 105 */ 106 private final Cache cache = new Cache(); 107 108 /** 109 * Listener container configuration. 110 */ 111 private final Listener listener = new Listener(); 112 113 private final Template template = new Template(); 114 115 private List<Address> parsedAddresses; 116 117 public String getHost() { 118 return this.host; 119 } 120 121 /** 122 * Returns the host from the first address, or the configured host if no addresses 123 * have been set. 124 * @return the host 125 * @see #setAddresses(String) 126 * @see #getHost() 127 */ 128 public String determineHost() { 129 if (CollectionUtils.isEmpty(this.parsedAddresses)) { 130 return getHost(); 131 } 132 return this.parsedAddresses.get(0).host; 133 } 134 135 public void setHost(String host) { 136 this.host = host; 137 } 138 139 public int getPort() { 140 return this.port; 141 } 142 143 /** 144 * Returns the port from the first address, or the configured port if no addresses 145 * have been set. 146 * @return the port 147 * @see #setAddresses(String) 148 * @see #getPort() 149 */ 150 public int determinePort() { 151 if (CollectionUtils.isEmpty(this.parsedAddresses)) { 152 return getPort(); 153 } 154 Address address = this.parsedAddresses.get(0); 155 return address.port; 156 } 157 158 public void setPort(int port) { 159 this.port = port; 160 } 161 162 public String getAddresses() { 163 return this.addresses; 164 } 165 166 /** 167 * Returns the comma-separated addresses or a single address ({@code host:port}) 168 * created from the configured host and port if no addresses have been set. 169 * @return the addresses 170 */ 171 public String determineAddresses() { 172 if (CollectionUtils.isEmpty(this.parsedAddresses)) { 173 return this.host + ":" + this.port; 174 } 175 List<String> addressStrings = new ArrayList<>(); 176 for (Address parsedAddress : this.parsedAddresses) { 177 addressStrings.add(parsedAddress.host + ":" + parsedAddress.port); 178 } 179 return StringUtils.collectionToCommaDelimitedString(addressStrings); 180 } 181 182 public void setAddresses(String addresses) { 183 this.addresses = addresses; 184 this.parsedAddresses = parseAddresses(addresses); 185 } 186 187 private List<Address> parseAddresses(String addresses) { 188 List<Address> parsedAddresses = new ArrayList<>(); 189 for (String address : StringUtils.commaDelimitedListToStringArray(addresses)) { 190 parsedAddresses.add(new Address(address)); 191 } 192 return parsedAddresses; 193 } 194 195 public String getUsername() { 196 return this.username; 197 } 198 199 /** 200 * If addresses have been set and the first address has a username it is returned. 201 * Otherwise returns the result of calling {@code getUsername()}. 202 * @return the username 203 * @see #setAddresses(String) 204 * @see #getUsername() 205 */ 206 public String determineUsername() { 207 if (CollectionUtils.isEmpty(this.parsedAddresses)) { 208 return this.username; 209 } 210 Address address = this.parsedAddresses.get(0); 211 return (address.username != null) ? address.username : this.username; 212 } 213 214 public void setUsername(String username) { 215 this.username = username; 216 } 217 218 public String getPassword() { 219 return this.password; 220 } 221 222 /** 223 * If addresses have been set and the first address has a password it is returned. 224 * Otherwise returns the result of calling {@code getPassword()}. 225 * @return the password or {@code null} 226 * @see #setAddresses(String) 227 * @see #getPassword() 228 */ 229 public String determinePassword() { 230 if (CollectionUtils.isEmpty(this.parsedAddresses)) { 231 return getPassword(); 232 } 233 Address address = this.parsedAddresses.get(0); 234 return (address.password != null) ? address.password : getPassword(); 235 } 236 237 public void setPassword(String password) { 238 this.password = password; 239 } 240 241 public Ssl getSsl() { 242 return this.ssl; 243 } 244 245 public String getVirtualHost() { 246 return this.virtualHost; 247 } 248 249 /** 250 * If addresses have been set and the first address has a virtual host it is returned. 251 * Otherwise returns the result of calling {@code getVirtualHost()}. 252 * @return the virtual host or {@code null} 253 * @see #setAddresses(String) 254 * @see #getVirtualHost() 255 */ 256 public String determineVirtualHost() { 257 if (CollectionUtils.isEmpty(this.parsedAddresses)) { 258 return getVirtualHost(); 259 } 260 Address address = this.parsedAddresses.get(0); 261 return (address.virtualHost != null) ? address.virtualHost : getVirtualHost(); 262 } 263 264 public void setVirtualHost(String virtualHost) { 265 this.virtualHost = "".equals(virtualHost) ? "/" : virtualHost; 266 } 267 268 public Duration getRequestedHeartbeat() { 269 return this.requestedHeartbeat; 270 } 271 272 public void setRequestedHeartbeat(Duration requestedHeartbeat) { 273 this.requestedHeartbeat = requestedHeartbeat; 274 } 275 276 public boolean isPublisherConfirms() { 277 return this.publisherConfirms; 278 } 279 280 public void setPublisherConfirms(boolean publisherConfirms) { 281 this.publisherConfirms = publisherConfirms; 282 } 283 284 public boolean isPublisherReturns() { 285 return this.publisherReturns; 286 } 287 288 public void setPublisherReturns(boolean publisherReturns) { 289 this.publisherReturns = publisherReturns; 290 } 291 292 public Duration getConnectionTimeout() { 293 return this.connectionTimeout; 294 } 295 296 public void setConnectionTimeout(Duration connectionTimeout) { 297 this.connectionTimeout = connectionTimeout; 298 } 299 300 public Cache getCache() { 301 return this.cache; 302 } 303 304 public Listener getListener() { 305 return this.listener; 306 } 307 308 public Template getTemplate() { 309 return this.template; 310 } 311 312 public static class Ssl { 313 314 /** 315 * Whether to enable SSL support. 316 */ 317 private boolean enabled; 318 319 /** 320 * Path to the key store that holds the SSL certificate. 321 */ 322 private String keyStore; 323 324 /** 325 * Key store type. 326 */ 327 private String keyStoreType = "PKCS12"; 328 329 /** 330 * Password used to access the key store. 331 */ 332 private String keyStorePassword; 333 334 /** 335 * Trust store that holds SSL certificates. 336 */ 337 private String trustStore; 338 339 /** 340 * Trust store type. 341 */ 342 private String trustStoreType = "JKS"; 343 344 /** 345 * Password used to access the trust store. 346 */ 347 private String trustStorePassword; 348 349 /** 350 * SSL algorithm to use. By default, configured by the Rabbit client library. 351 */ 352 private String algorithm; 353 354 /** 355 * Whether to enable server side certificate validation. 356 */ 357 private boolean validateServerCertificate = true; 358 359 /** 360 * Whether to enable hostname verification. 361 */ 362 private boolean verifyHostname = true; 363 364 public boolean isEnabled() { 365 return this.enabled; 366 } 367 368 public void setEnabled(boolean enabled) { 369 this.enabled = enabled; 370 } 371 372 public String getKeyStore() { 373 return this.keyStore; 374 } 375 376 public void setKeyStore(String keyStore) { 377 this.keyStore = keyStore; 378 } 379 380 public String getKeyStoreType() { 381 return this.keyStoreType; 382 } 383 384 public void setKeyStoreType(String keyStoreType) { 385 this.keyStoreType = keyStoreType; 386 } 387 388 public String getKeyStorePassword() { 389 return this.keyStorePassword; 390 } 391 392 public void setKeyStorePassword(String keyStorePassword) { 393 this.keyStorePassword = keyStorePassword; 394 } 395 396 public String getTrustStore() { 397 return this.trustStore; 398 } 399 400 public void setTrustStore(String trustStore) { 401 this.trustStore = trustStore; 402 } 403 404 public String getTrustStoreType() { 405 return this.trustStoreType; 406 } 407 408 public void setTrustStoreType(String trustStoreType) { 409 this.trustStoreType = trustStoreType; 410 } 411 412 public String getTrustStorePassword() { 413 return this.trustStorePassword; 414 } 415 416 public void setTrustStorePassword(String trustStorePassword) { 417 this.trustStorePassword = trustStorePassword; 418 } 419 420 public String getAlgorithm() { 421 return this.algorithm; 422 } 423 424 public void setAlgorithm(String sslAlgorithm) { 425 this.algorithm = sslAlgorithm; 426 } 427 428 public boolean isValidateServerCertificate() { 429 return this.validateServerCertificate; 430 } 431 432 public void setValidateServerCertificate(boolean validateServerCertificate) { 433 this.validateServerCertificate = validateServerCertificate; 434 } 435 436 public boolean getVerifyHostname() { 437 return this.verifyHostname; 438 } 439 440 public void setVerifyHostname(boolean verifyHostname) { 441 this.verifyHostname = verifyHostname; 442 } 443 444 } 445 446 public static class Cache { 447 448 private final Channel channel = new Channel(); 449 450 private final Connection connection = new Connection(); 451 452 public Channel getChannel() { 453 return this.channel; 454 } 455 456 public Connection getConnection() { 457 return this.connection; 458 } 459 460 public static class Channel { 461 462 /** 463 * Number of channels to retain in the cache. When "check-timeout" > 0, max 464 * channels per connection. 465 */ 466 private Integer size; 467 468 /** 469 * Duration to wait to obtain a channel if the cache size has been reached. If 470 * 0, always create a new channel. 471 */ 472 private Duration checkoutTimeout; 473 474 public Integer getSize() { 475 return this.size; 476 } 477 478 public void setSize(Integer size) { 479 this.size = size; 480 } 481 482 public Duration getCheckoutTimeout() { 483 return this.checkoutTimeout; 484 } 485 486 public void setCheckoutTimeout(Duration checkoutTimeout) { 487 this.checkoutTimeout = checkoutTimeout; 488 } 489 490 } 491 492 public static class Connection { 493 494 /** 495 * Connection factory cache mode. 496 */ 497 private CacheMode mode = CacheMode.CHANNEL; 498 499 /** 500 * Number of connections to cache. Only applies when mode is CONNECTION. 501 */ 502 private Integer size; 503 504 public CacheMode getMode() { 505 return this.mode; 506 } 507 508 public void setMode(CacheMode mode) { 509 this.mode = mode; 510 } 511 512 public Integer getSize() { 513 return this.size; 514 } 515 516 public void setSize(Integer size) { 517 this.size = size; 518 } 519 520 } 521 522 } 523 524 public enum ContainerType { 525 526 /** 527 * Container where the RabbitMQ consumer dispatches messages to an invoker thread. 528 */ 529 SIMPLE, 530 531 /** 532 * Container where the listener is invoked directly on the RabbitMQ consumer 533 * thread. 534 */ 535 DIRECT 536 537 } 538 539 public static class Listener { 540 541 /** 542 * Listener container type. 543 */ 544 private ContainerType type = ContainerType.SIMPLE; 545 546 private final SimpleContainer simple = new SimpleContainer(); 547 548 private final DirectContainer direct = new DirectContainer(); 549 550 public ContainerType getType() { 551 return this.type; 552 } 553 554 public void setType(ContainerType containerType) { 555 this.type = containerType; 556 } 557 558 public SimpleContainer getSimple() { 559 return this.simple; 560 } 561 562 public DirectContainer getDirect() { 563 return this.direct; 564 } 565 566 } 567 568 public abstract static class AmqpContainer { 569 570 /** 571 * Whether to start the container automatically on startup. 572 */ 573 private boolean autoStartup = true; 574 575 /** 576 * Acknowledge mode of container. 577 */ 578 private AcknowledgeMode acknowledgeMode; 579 580 /** 581 * Maximum number of unacknowledged messages that can be outstanding at each 582 * consumer. 583 */ 584 private Integer prefetch; 585 586 /** 587 * Whether rejected deliveries are re-queued by default. 588 */ 589 private Boolean defaultRequeueRejected; 590 591 /** 592 * How often idle container events should be published. 593 */ 594 private Duration idleEventInterval; 595 596 /** 597 * Optional properties for a retry interceptor. 598 */ 599 private final ListenerRetry retry = new ListenerRetry(); 600 601 public boolean isAutoStartup() { 602 return this.autoStartup; 603 } 604 605 public void setAutoStartup(boolean autoStartup) { 606 this.autoStartup = autoStartup; 607 } 608 609 public AcknowledgeMode getAcknowledgeMode() { 610 return this.acknowledgeMode; 611 } 612 613 public void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) { 614 this.acknowledgeMode = acknowledgeMode; 615 } 616 617 public Integer getPrefetch() { 618 return this.prefetch; 619 } 620 621 public void setPrefetch(Integer prefetch) { 622 this.prefetch = prefetch; 623 } 624 625 public Boolean getDefaultRequeueRejected() { 626 return this.defaultRequeueRejected; 627 } 628 629 public void setDefaultRequeueRejected(Boolean defaultRequeueRejected) { 630 this.defaultRequeueRejected = defaultRequeueRejected; 631 } 632 633 public Duration getIdleEventInterval() { 634 return this.idleEventInterval; 635 } 636 637 public void setIdleEventInterval(Duration idleEventInterval) { 638 this.idleEventInterval = idleEventInterval; 639 } 640 641 public abstract boolean isMissingQueuesFatal(); 642 643 public ListenerRetry getRetry() { 644 return this.retry; 645 } 646 647 } 648 649 /** 650 * Configuration properties for {@code SimpleMessageListenerContainer}. 651 */ 652 public static class SimpleContainer extends AmqpContainer { 653 654 /** 655 * Minimum number of listener invoker threads. 656 */ 657 private Integer concurrency; 658 659 /** 660 * Maximum number of listener invoker threads. 661 */ 662 private Integer maxConcurrency; 663 664 /** 665 * Number of messages to be processed between acks when the acknowledge mode is 666 * AUTO. If larger than prefetch, prefetch will be increased to this value. 667 */ 668 private Integer transactionSize; 669 670 /** 671 * Whether to fail if the queues declared by the container are not available on 672 * the broker and/or whether to stop the container if one or more queues are 673 * deleted at runtime. 674 */ 675 private boolean missingQueuesFatal = true; 676 677 public Integer getConcurrency() { 678 return this.concurrency; 679 } 680 681 public void setConcurrency(Integer concurrency) { 682 this.concurrency = concurrency; 683 } 684 685 public Integer getMaxConcurrency() { 686 return this.maxConcurrency; 687 } 688 689 public void setMaxConcurrency(Integer maxConcurrency) { 690 this.maxConcurrency = maxConcurrency; 691 } 692 693 public Integer getTransactionSize() { 694 return this.transactionSize; 695 } 696 697 public void setTransactionSize(Integer transactionSize) { 698 this.transactionSize = transactionSize; 699 } 700 701 @Override 702 public boolean isMissingQueuesFatal() { 703 return this.missingQueuesFatal; 704 } 705 706 public void setMissingQueuesFatal(boolean missingQueuesFatal) { 707 this.missingQueuesFatal = missingQueuesFatal; 708 } 709 710 } 711 712 /** 713 * Configuration properties for {@code DirectMessageListenerContainer}. 714 */ 715 public static class DirectContainer extends AmqpContainer { 716 717 /** 718 * Number of consumers per queue. 719 */ 720 private Integer consumersPerQueue; 721 722 /** 723 * Whether to fail if the queues declared by the container are not available on 724 * the broker. 725 */ 726 private boolean missingQueuesFatal = false; 727 728 public Integer getConsumersPerQueue() { 729 return this.consumersPerQueue; 730 } 731 732 public void setConsumersPerQueue(Integer consumersPerQueue) { 733 this.consumersPerQueue = consumersPerQueue; 734 } 735 736 @Override 737 public boolean isMissingQueuesFatal() { 738 return this.missingQueuesFatal; 739 } 740 741 public void setMissingQueuesFatal(boolean missingQueuesFatal) { 742 this.missingQueuesFatal = missingQueuesFatal; 743 } 744 745 } 746 747 public static class Template { 748 749 private final Retry retry = new Retry(); 750 751 /** 752 * Whether to enable mandatory messages. 753 */ 754 private Boolean mandatory; 755 756 /** 757 * Timeout for `receive()` operations. 758 */ 759 private Duration receiveTimeout; 760 761 /** 762 * Timeout for `sendAndReceive()` operations. 763 */ 764 private Duration replyTimeout; 765 766 /** 767 * Name of the default exchange to use for send operations. 768 */ 769 private String exchange = ""; 770 771 /** 772 * Value of a default routing key to use for send operations. 773 */ 774 private String routingKey = ""; 775 776 /** 777 * Name of the default queue to receive messages from when none is specified 778 * explicitly. 779 */ 780 private String defaultReceiveQueue; 781 782 public Retry getRetry() { 783 return this.retry; 784 } 785 786 public Boolean getMandatory() { 787 return this.mandatory; 788 } 789 790 public void setMandatory(Boolean mandatory) { 791 this.mandatory = mandatory; 792 } 793 794 public Duration getReceiveTimeout() { 795 return this.receiveTimeout; 796 } 797 798 public void setReceiveTimeout(Duration receiveTimeout) { 799 this.receiveTimeout = receiveTimeout; 800 } 801 802 public Duration getReplyTimeout() { 803 return this.replyTimeout; 804 } 805 806 public void setReplyTimeout(Duration replyTimeout) { 807 this.replyTimeout = replyTimeout; 808 } 809 810 public String getExchange() { 811 return this.exchange; 812 } 813 814 public void setExchange(String exchange) { 815 this.exchange = exchange; 816 } 817 818 public String getRoutingKey() { 819 return this.routingKey; 820 } 821 822 public void setRoutingKey(String routingKey) { 823 this.routingKey = routingKey; 824 } 825 826 public String getDefaultReceiveQueue() { 827 return this.defaultReceiveQueue; 828 } 829 830 public void setDefaultReceiveQueue(String defaultReceiveQueue) { 831 this.defaultReceiveQueue = defaultReceiveQueue; 832 } 833 834 @Deprecated 835 @DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.template.default-receive-queue") 836 public String getQueue() { 837 return getDefaultReceiveQueue(); 838 } 839 840 @Deprecated 841 public void setQueue(String queue) { 842 setDefaultReceiveQueue(queue); 843 } 844 845 } 846 847 public static class Retry { 848 849 /** 850 * Whether publishing retries are enabled. 851 */ 852 private boolean enabled; 853 854 /** 855 * Maximum number of attempts to deliver a message. 856 */ 857 private int maxAttempts = 3; 858 859 /** 860 * Duration between the first and second attempt to deliver a message. 861 */ 862 private Duration initialInterval = Duration.ofMillis(1000); 863 864 /** 865 * Multiplier to apply to the previous retry interval. 866 */ 867 private double multiplier = 1.0; 868 869 /** 870 * Maximum duration between attempts. 871 */ 872 private Duration maxInterval = Duration.ofMillis(10000); 873 874 public boolean isEnabled() { 875 return this.enabled; 876 } 877 878 public void setEnabled(boolean enabled) { 879 this.enabled = enabled; 880 } 881 882 public int getMaxAttempts() { 883 return this.maxAttempts; 884 } 885 886 public void setMaxAttempts(int maxAttempts) { 887 this.maxAttempts = maxAttempts; 888 } 889 890 public Duration getInitialInterval() { 891 return this.initialInterval; 892 } 893 894 public void setInitialInterval(Duration initialInterval) { 895 this.initialInterval = initialInterval; 896 } 897 898 public double getMultiplier() { 899 return this.multiplier; 900 } 901 902 public void setMultiplier(double multiplier) { 903 this.multiplier = multiplier; 904 } 905 906 public Duration getMaxInterval() { 907 return this.maxInterval; 908 } 909 910 public void setMaxInterval(Duration maxInterval) { 911 this.maxInterval = maxInterval; 912 } 913 914 } 915 916 public static class ListenerRetry extends Retry { 917 918 /** 919 * Whether retries are stateless or stateful. 920 */ 921 private boolean stateless = true; 922 923 public boolean isStateless() { 924 return this.stateless; 925 } 926 927 public void setStateless(boolean stateless) { 928 this.stateless = stateless; 929 } 930 931 } 932 933 private static final class Address { 934 935 private static final String PREFIX_AMQP = "amqp://"; 936 937 private static final int DEFAULT_PORT = 5672; 938 939 private String host; 940 941 private int port; 942 943 private String username; 944 945 private String password; 946 947 private String virtualHost; 948 949 private Address(String input) { 950 input = input.trim(); 951 input = trimPrefix(input); 952 input = parseUsernameAndPassword(input); 953 input = parseVirtualHost(input); 954 parseHostAndPort(input); 955 } 956 957 private String trimPrefix(String input) { 958 if (input.startsWith(PREFIX_AMQP)) { 959 input = input.substring(PREFIX_AMQP.length()); 960 } 961 return input; 962 } 963 964 private String parseUsernameAndPassword(String input) { 965 if (input.contains("@")) { 966 String[] split = StringUtils.split(input, "@"); 967 String creds = split[0]; 968 input = split[1]; 969 split = StringUtils.split(creds, ":"); 970 this.username = split[0]; 971 if (split.length > 0) { 972 this.password = split[1]; 973 } 974 } 975 return input; 976 } 977 978 private String parseVirtualHost(String input) { 979 int hostIndex = input.indexOf('/'); 980 if (hostIndex >= 0) { 981 this.virtualHost = input.substring(hostIndex + 1); 982 if (this.virtualHost.isEmpty()) { 983 this.virtualHost = "/"; 984 } 985 input = input.substring(0, hostIndex); 986 } 987 return input; 988 } 989 990 private void parseHostAndPort(String input) { 991 int portIndex = input.indexOf(':'); 992 if (portIndex == -1) { 993 this.host = input; 994 this.port = DEFAULT_PORT; 995 } 996 else { 997 this.host = input.substring(0, portIndex); 998 this.port = Integer.valueOf(input.substring(portIndex + 1)); 999 } 1000 } 1001 1002 } 1003 1004}