001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.config; 018 019import java.util.Arrays; 020import java.util.Collection; 021 022import org.springframework.context.event.SmartApplicationListener; 023import org.springframework.lang.Nullable; 024import org.springframework.messaging.MessageChannel; 025import org.springframework.messaging.SubscribableChannel; 026import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; 027import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; 028import org.springframework.util.Assert; 029import org.springframework.util.PathMatcher; 030 031/** 032 * A registry for configuring message broker options. 033 * 034 * @author Rossen Stoyanchev 035 * @author Sebastien Deleuze 036 * @since 4.0 037 */ 038public class MessageBrokerRegistry { 039 040 private final SubscribableChannel clientInboundChannel; 041 042 private final MessageChannel clientOutboundChannel; 043 044 @Nullable 045 private SimpleBrokerRegistration simpleBrokerRegistration; 046 047 @Nullable 048 private StompBrokerRelayRegistration brokerRelayRegistration; 049 050 private final ChannelRegistration brokerChannelRegistration = new ChannelRegistration(); 051 052 @Nullable 053 private String[] applicationDestinationPrefixes; 054 055 @Nullable 056 private String userDestinationPrefix; 057 058 @Nullable 059 private Integer userRegistryOrder; 060 061 @Nullable 062 private PathMatcher pathMatcher; 063 064 @Nullable 065 private Integer cacheLimit; 066 067 private boolean preservePublishOrder; 068 069 070 public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) { 071 Assert.notNull(clientInboundChannel, "Inbound channel must not be null"); 072 Assert.notNull(clientOutboundChannel, "Outbound channel must not be null"); 073 this.clientInboundChannel = clientInboundChannel; 074 this.clientOutboundChannel = clientOutboundChannel; 075 } 076 077 078 /** 079 * Enable a simple message broker and configure one or more prefixes to filter 080 * destinations targeting the broker (e.g. destinations prefixed with "/topic"). 081 */ 082 public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) { 083 this.simpleBrokerRegistration = new SimpleBrokerRegistration( 084 this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes); 085 return this.simpleBrokerRegistration; 086 } 087 088 /** 089 * Enable a STOMP broker relay and configure the destination prefixes supported by the 090 * message broker. Check the STOMP documentation of the message broker for supported 091 * destinations. 092 */ 093 public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) { 094 this.brokerRelayRegistration = new StompBrokerRelayRegistration( 095 this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes); 096 return this.brokerRelayRegistration; 097 } 098 099 /** 100 * Customize the channel used to send messages from the application to the message 101 * broker. By default, messages from the application to the message broker are sent 102 * synchronously, which means application code sending a message will find out 103 * if the message cannot be sent through an exception. However, this can be changed 104 * if the broker channel is configured here with task executor properties. 105 */ 106 public ChannelRegistration configureBrokerChannel() { 107 return this.brokerChannelRegistration; 108 } 109 110 protected ChannelRegistration getBrokerChannelRegistration() { 111 return this.brokerChannelRegistration; 112 } 113 114 @Nullable 115 protected String getUserDestinationBroadcast() { 116 return (this.brokerRelayRegistration != null ? 117 this.brokerRelayRegistration.getUserDestinationBroadcast() : null); 118 } 119 120 @Nullable 121 protected String getUserRegistryBroadcast() { 122 return (this.brokerRelayRegistration != null ? 123 this.brokerRelayRegistration.getUserRegistryBroadcast() : null); 124 } 125 126 /** 127 * Configure one or more prefixes to filter destinations targeting application 128 * annotated methods. For example destinations prefixed with "/app" may be 129 * processed by annotated methods while other destinations may target the 130 * message broker (e.g. "/topic", "/queue"). 131 * <p>When messages are processed, the matching prefix is removed from the destination 132 * in order to form the lookup path. This means annotations should not contain the 133 * destination prefix. 134 * <p>Prefixes that do not have a trailing slash will have one automatically appended. 135 */ 136 public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes) { 137 this.applicationDestinationPrefixes = prefixes; 138 return this; 139 } 140 141 @Nullable 142 protected Collection<String> getApplicationDestinationPrefixes() { 143 return (this.applicationDestinationPrefixes != null ? 144 Arrays.asList(this.applicationDestinationPrefixes) : null); 145 } 146 147 /** 148 * Configure the prefix used to identify user destinations. User destinations 149 * provide the ability for a user to subscribe to queue names unique to their 150 * session as well as for others to send messages to those unique, 151 * user-specific queues. 152 * <p>For example when a user attempts to subscribe to "/user/queue/position-updates", 153 * the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a 154 * unique queue name that does not collide with any other user attempting to do the same. 155 * Subsequently when messages are sent to "/user/{username}/queue/position-updates", 156 * the destination is translated to "/queue/position-updatesi9oqdfzo". 157 * <p>The default prefix used to identify such destinations is "/user/". 158 */ 159 public MessageBrokerRegistry setUserDestinationPrefix(String destinationPrefix) { 160 this.userDestinationPrefix = destinationPrefix; 161 return this; 162 } 163 164 @Nullable 165 protected String getUserDestinationPrefix() { 166 return this.userDestinationPrefix; 167 } 168 169 /** 170 * Set the order for the 171 * {@link org.springframework.messaging.simp.user.SimpUserRegistry 172 * SimpUserRegistry} to use as a {@link SmartApplicationListener}. 173 * @param order the order value 174 * @since 5.0.8 175 */ 176 public void setUserRegistryOrder(int order) { 177 this.userRegistryOrder = order; 178 } 179 180 @Nullable 181 protected Integer getUserRegistryOrder() { 182 return this.userRegistryOrder; 183 } 184 185 /** 186 * Configure the PathMatcher to use to match the destinations of incoming 187 * messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods. 188 * <p>By default {@link org.springframework.util.AntPathMatcher} is configured. 189 * However applications may provide an {@code AntPathMatcher} instance 190 * customized to use "." (commonly used in messaging) instead of "/" as path 191 * separator or provide a completely different PathMatcher implementation. 192 * <p>Note that the configured PathMatcher is only used for matching the 193 * portion of the destination after the configured prefix. For example given 194 * application destination prefix "/app" and destination "/app/price.stock.**", 195 * the message might be mapped to a controller with "price" and "stock.**" 196 * as its type and method-level mappings respectively. 197 * <p>When the simple broker is enabled, the PathMatcher configured here is 198 * also used to match message destinations when brokering messages. 199 * @since 4.1 200 * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setPathMatcher 201 */ 202 public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) { 203 this.pathMatcher = pathMatcher; 204 return this; 205 } 206 207 @Nullable 208 protected PathMatcher getPathMatcher() { 209 return this.pathMatcher; 210 } 211 212 /** 213 * Configure the cache limit to apply for registrations with the broker. 214 * <p>This is currently only applied for the destination cache in the 215 * subscription registry. The default cache limit there is 1024. 216 * @since 4.3.2 217 * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setCacheLimit 218 */ 219 public MessageBrokerRegistry setCacheLimit(int cacheLimit) { 220 this.cacheLimit = cacheLimit; 221 return this; 222 } 223 224 /** 225 * Whether the client must receive messages in the order of publication. 226 * <p>By default messages sent to the {@code "clientOutboundChannel"} may 227 * not be processed in the same order because the channel is backed by a 228 * ThreadPoolExecutor that in turn does not guarantee processing in order. 229 * <p>When this flag is set to {@code true} messages within the same session 230 * will be sent to the {@code "clientOutboundChannel"} one at a time in 231 * order to preserve the order of publication. Enable this only if needed 232 * since there is some performance overhead to keep messages in order. 233 * @since 5.1 234 */ 235 public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) { 236 this.preservePublishOrder = preservePublishOrder; 237 return this; 238 } 239 240 @Nullable 241 protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { 242 if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) { 243 enableSimpleBroker(); 244 } 245 if (this.simpleBrokerRegistration != null) { 246 SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel); 247 handler.setPathMatcher(this.pathMatcher); 248 handler.setCacheLimit(this.cacheLimit); 249 handler.setPreservePublishOrder(this.preservePublishOrder); 250 return handler; 251 } 252 return null; 253 } 254 255 @Nullable 256 protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) { 257 if (this.brokerRelayRegistration != null) { 258 StompBrokerRelayMessageHandler relay = this.brokerRelayRegistration.getMessageHandler(brokerChannel); 259 relay.setPreservePublishOrder(this.preservePublishOrder); 260 return relay; 261 } 262 return null; 263 } 264 265}