001/* 002 * Copyright 2002-2017 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.config; 018 019import java.util.Arrays; 020import java.util.Collection; 021 022import org.springframework.messaging.MessageChannel; 023import org.springframework.messaging.SubscribableChannel; 024import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; 025import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; 026import org.springframework.util.Assert; 027import org.springframework.util.PathMatcher; 028 029/** 030 * A registry for configuring message broker options. 031 * 032 * @author Rossen Stoyanchev 033 * @author Sebastien Deleuze 034 * @since 4.0 035 */ 036public class MessageBrokerRegistry { 037 038 private final SubscribableChannel clientInboundChannel; 039 040 private final MessageChannel clientOutboundChannel; 041 042 private SimpleBrokerRegistration simpleBrokerRegistration; 043 044 private StompBrokerRelayRegistration brokerRelayRegistration; 045 046 private final ChannelRegistration brokerChannelRegistration = new ChannelRegistration(); 047 048 private String[] applicationDestinationPrefixes; 049 050 private String userDestinationPrefix; 051 052 private PathMatcher pathMatcher; 053 054 private Integer cacheLimit; 055 056 057 public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) { 058 Assert.notNull(clientInboundChannel, "Inbound channel must not be null"); 059 Assert.notNull(clientOutboundChannel, "Outbound channel must not be null"); 060 this.clientInboundChannel = clientInboundChannel; 061 this.clientOutboundChannel = clientOutboundChannel; 062 } 063 064 065 /** 066 * Enable a simple message broker and configure one or more prefixes to filter 067 * destinations targeting the broker (e.g. destinations prefixed with "/topic"). 068 */ 069 public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) { 070 this.simpleBrokerRegistration = new SimpleBrokerRegistration( 071 this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes); 072 return this.simpleBrokerRegistration; 073 } 074 075 /** 076 * Enable a STOMP broker relay and configure the destination prefixes supported by the 077 * message broker. Check the STOMP documentation of the message broker for supported 078 * destinations. 079 */ 080 public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) { 081 this.brokerRelayRegistration = new StompBrokerRelayRegistration( 082 this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes); 083 return this.brokerRelayRegistration; 084 } 085 086 /** 087 * Customize the channel used to send messages from the application to the message 088 * broker. By default, messages from the application to the message broker are sent 089 * synchronously, which means application code sending a message will find out 090 * if the message cannot be sent through an exception. However, this can be changed 091 * if the broker channel is configured here with task executor properties. 092 */ 093 public ChannelRegistration configureBrokerChannel() { 094 return this.brokerChannelRegistration; 095 } 096 097 protected ChannelRegistration getBrokerChannelRegistration() { 098 return this.brokerChannelRegistration; 099 } 100 101 protected String getUserDestinationBroadcast() { 102 return (this.brokerRelayRegistration != null ? 103 this.brokerRelayRegistration.getUserDestinationBroadcast() : null); 104 } 105 106 protected String getUserRegistryBroadcast() { 107 return (this.brokerRelayRegistration != null ? 108 this.brokerRelayRegistration.getUserRegistryBroadcast() : null); 109 } 110 111 /** 112 * Configure one or more prefixes to filter destinations targeting application 113 * annotated methods. For example destinations prefixed with "/app" may be 114 * processed by annotated methods while other destinations may target the 115 * message broker (e.g. "/topic", "/queue"). 116 * <p>When messages are processed, the matching prefix is removed from the destination 117 * in order to form the lookup path. This means annotations should not contain the 118 * destination prefix. 119 * <p>Prefixes that do not have a trailing slash will have one automatically appended. 120 */ 121 public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes) { 122 this.applicationDestinationPrefixes = prefixes; 123 return this; 124 } 125 126 protected Collection<String> getApplicationDestinationPrefixes() { 127 return (this.applicationDestinationPrefixes != null ? 128 Arrays.asList(this.applicationDestinationPrefixes) : null); 129 } 130 131 /** 132 * Configure the prefix used to identify user destinations. User destinations 133 * provide the ability for a user to subscribe to queue names unique to their 134 * session as well as for others to send messages to those unique, 135 * user-specific queues. 136 * <p>For example when a user attempts to subscribe to "/user/queue/position-updates", 137 * the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a 138 * unique queue name that does not collide with any other user attempting to do the same. 139 * Subsequently when messages are sent to "/user/{username}/queue/position-updates", 140 * the destination is translated to "/queue/position-updatesi9oqdfzo". 141 * <p>The default prefix used to identify such destinations is "/user/". 142 */ 143 public MessageBrokerRegistry setUserDestinationPrefix(String destinationPrefix) { 144 this.userDestinationPrefix = destinationPrefix; 145 return this; 146 } 147 148 protected String getUserDestinationPrefix() { 149 return this.userDestinationPrefix; 150 } 151 152 /** 153 * Configure the PathMatcher to use to match the destinations of incoming 154 * messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods. 155 * <p>By default {@link org.springframework.util.AntPathMatcher} is configured. 156 * However applications may provide an {@code AntPathMatcher} instance 157 * customized to use "." (commonly used in messaging) instead of "/" as path 158 * separator or provide a completely different PathMatcher implementation. 159 * <p>Note that the configured PathMatcher is only used for matching the 160 * portion of the destination after the configured prefix. For example given 161 * application destination prefix "/app" and destination "/app/price.stock.**", 162 * the message might be mapped to a controller with "price" and "stock.**" 163 * as its type and method-level mappings respectively. 164 * <p>When the simple broker is enabled, the PathMatcher configured here is 165 * also used to match message destinations when brokering messages. 166 * @since 4.1 167 * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setPathMatcher 168 */ 169 public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) { 170 this.pathMatcher = pathMatcher; 171 return this; 172 } 173 174 protected PathMatcher getPathMatcher() { 175 return this.pathMatcher; 176 } 177 178 /** 179 * Configure the cache limit to apply for registrations with the broker. 180 * <p>This is currently only applied for the destination cache in the 181 * subscription registry. The default cache limit there is 1024. 182 * @since 4.3.2 183 * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setCacheLimit 184 */ 185 public MessageBrokerRegistry setCacheLimit(int cacheLimit) { 186 this.cacheLimit = cacheLimit; 187 return this; 188 } 189 190 191 protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { 192 if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) { 193 enableSimpleBroker(); 194 } 195 if (this.simpleBrokerRegistration != null) { 196 SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel); 197 handler.setPathMatcher(this.pathMatcher); 198 handler.setCacheLimit(this.cacheLimit); 199 return handler; 200 } 201 return null; 202 } 203 204 protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) { 205 if (this.brokerRelayRegistration != null) { 206 return this.brokerRelayRegistration.getMessageHandler(brokerChannel); 207 } 208 return null; 209 } 210 211}