001/*
002 * Copyright 2002-2018 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.config;
018
019import java.util.Arrays;
020import java.util.Collection;
021
022import org.springframework.context.event.SmartApplicationListener;
023import org.springframework.lang.Nullable;
024import org.springframework.messaging.MessageChannel;
025import org.springframework.messaging.SubscribableChannel;
026import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
027import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
028import org.springframework.util.Assert;
029import org.springframework.util.PathMatcher;
030
031/**
032 * A registry for configuring message broker options.
033 *
034 * @author Rossen Stoyanchev
035 * @author Sebastien Deleuze
036 * @since 4.0
037 */
038public class MessageBrokerRegistry {
039
040        private final SubscribableChannel clientInboundChannel;
041
042        private final MessageChannel clientOutboundChannel;
043
044        @Nullable
045        private SimpleBrokerRegistration simpleBrokerRegistration;
046
047        @Nullable
048        private StompBrokerRelayRegistration brokerRelayRegistration;
049
050        private final ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
051
052        @Nullable
053        private String[] applicationDestinationPrefixes;
054
055        @Nullable
056        private String userDestinationPrefix;
057
058        @Nullable
059        private Integer userRegistryOrder;
060
061        @Nullable
062        private PathMatcher pathMatcher;
063
064        @Nullable
065        private Integer cacheLimit;
066
067        private boolean preservePublishOrder;
068
069
070        public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
071                Assert.notNull(clientInboundChannel, "Inbound channel must not be null");
072                Assert.notNull(clientOutboundChannel, "Outbound channel must not be null");
073                this.clientInboundChannel = clientInboundChannel;
074                this.clientOutboundChannel = clientOutboundChannel;
075        }
076
077
078        /**
079         * Enable a simple message broker and configure one or more prefixes to filter
080         * destinations targeting the broker (e.g. destinations prefixed with "/topic").
081         */
082        public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes) {
083                this.simpleBrokerRegistration = new SimpleBrokerRegistration(
084                                this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes);
085                return this.simpleBrokerRegistration;
086        }
087
088        /**
089         * Enable a STOMP broker relay and configure the destination prefixes supported by the
090         * message broker. Check the STOMP documentation of the message broker for supported
091         * destinations.
092         */
093        public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes) {
094                this.brokerRelayRegistration = new StompBrokerRelayRegistration(
095                                this.clientInboundChannel, this.clientOutboundChannel, destinationPrefixes);
096                return this.brokerRelayRegistration;
097        }
098
099        /**
100         * Customize the channel used to send messages from the application to the message
101         * broker. By default, messages from the application to the message broker are sent
102         * synchronously, which means application code sending a message will find out
103         * if the message cannot be sent through an exception. However, this can be changed
104         * if the broker channel is configured here with task executor properties.
105         */
106        public ChannelRegistration configureBrokerChannel() {
107                return this.brokerChannelRegistration;
108        }
109
110        protected ChannelRegistration getBrokerChannelRegistration() {
111                return this.brokerChannelRegistration;
112        }
113
114        @Nullable
115        protected String getUserDestinationBroadcast() {
116                return (this.brokerRelayRegistration != null ?
117                                this.brokerRelayRegistration.getUserDestinationBroadcast() : null);
118        }
119
120        @Nullable
121        protected String getUserRegistryBroadcast() {
122                return (this.brokerRelayRegistration != null ?
123                                this.brokerRelayRegistration.getUserRegistryBroadcast() : null);
124        }
125
126        /**
127         * Configure one or more prefixes to filter destinations targeting application
128         * annotated methods. For example destinations prefixed with "/app" may be
129         * processed by annotated methods while other destinations may target the
130         * message broker (e.g. "/topic", "/queue").
131         * <p>When messages are processed, the matching prefix is removed from the destination
132         * in order to form the lookup path. This means annotations should not contain the
133         * destination prefix.
134         * <p>Prefixes that do not have a trailing slash will have one automatically appended.
135         */
136        public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes) {
137                this.applicationDestinationPrefixes = prefixes;
138                return this;
139        }
140
141        @Nullable
142        protected Collection<String> getApplicationDestinationPrefixes() {
143                return (this.applicationDestinationPrefixes != null ?
144                                Arrays.asList(this.applicationDestinationPrefixes) : null);
145        }
146
147        /**
148         * Configure the prefix used to identify user destinations. User destinations
149         * provide the ability for a user to subscribe to queue names unique to their
150         * session as well as for others to send messages to those unique,
151         * user-specific queues.
152         * <p>For example when a user attempts to subscribe to "/user/queue/position-updates",
153         * the destination may be translated to "/queue/position-updatesi9oqdfzo" yielding a
154         * unique queue name that does not collide with any other user attempting to do the same.
155         * Subsequently when messages are sent to "/user/{username}/queue/position-updates",
156         * the destination is translated to "/queue/position-updatesi9oqdfzo".
157         * <p>The default prefix used to identify such destinations is "/user/".
158         */
159        public MessageBrokerRegistry setUserDestinationPrefix(String destinationPrefix) {
160                this.userDestinationPrefix = destinationPrefix;
161                return this;
162        }
163
164        @Nullable
165        protected String getUserDestinationPrefix() {
166                return this.userDestinationPrefix;
167        }
168
169        /**
170         * Set the order for the
171         * {@link org.springframework.messaging.simp.user.SimpUserRegistry
172         * SimpUserRegistry} to use as a {@link SmartApplicationListener}.
173         * @param order the order value
174         * @since 5.0.8
175         */
176        public void setUserRegistryOrder(int order) {
177                this.userRegistryOrder = order;
178        }
179
180        @Nullable
181        protected Integer getUserRegistryOrder() {
182                return this.userRegistryOrder;
183        }
184
185        /**
186         * Configure the PathMatcher to use to match the destinations of incoming
187         * messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
188         * <p>By default {@link org.springframework.util.AntPathMatcher} is configured.
189         * However applications may provide an {@code AntPathMatcher} instance
190         * customized to use "." (commonly used in messaging) instead of "/" as path
191         * separator or provide a completely different PathMatcher implementation.
192         * <p>Note that the configured PathMatcher is only used for matching the
193         * portion of the destination after the configured prefix. For example given
194         * application destination prefix "/app" and destination "/app/price.stock.**",
195         * the message might be mapped to a controller with "price" and "stock.**"
196         * as its type and method-level mappings respectively.
197         * <p>When the simple broker is enabled, the PathMatcher configured here is
198         * also used to match message destinations when brokering messages.
199         * @since 4.1
200         * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setPathMatcher
201         */
202        public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) {
203                this.pathMatcher = pathMatcher;
204                return this;
205        }
206
207        @Nullable
208        protected PathMatcher getPathMatcher() {
209                return this.pathMatcher;
210        }
211
212        /**
213         * Configure the cache limit to apply for registrations with the broker.
214         * <p>This is currently only applied for the destination cache in the
215         * subscription registry. The default cache limit there is 1024.
216         * @since 4.3.2
217         * @see org.springframework.messaging.simp.broker.DefaultSubscriptionRegistry#setCacheLimit
218         */
219        public MessageBrokerRegistry setCacheLimit(int cacheLimit) {
220                this.cacheLimit = cacheLimit;
221                return this;
222        }
223
224        /**
225         * Whether the client must receive messages in the order of publication.
226         * <p>By default messages sent to the {@code "clientOutboundChannel"} may
227         * not be processed in the same order because the channel is backed by a
228         * ThreadPoolExecutor that in turn does not guarantee processing in order.
229         * <p>When this flag is set to {@code true} messages within the same session
230         * will be sent to the {@code "clientOutboundChannel"} one at a time in
231         * order to preserve the order of publication. Enable this only if needed
232         * since there is some performance overhead to keep messages in order.
233         * @since 5.1
234         */
235        public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) {
236                this.preservePublishOrder = preservePublishOrder;
237                return this;
238        }
239
240        @Nullable
241        protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
242                if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) {
243                        enableSimpleBroker();
244                }
245                if (this.simpleBrokerRegistration != null) {
246                        SimpleBrokerMessageHandler handler = this.simpleBrokerRegistration.getMessageHandler(brokerChannel);
247                        handler.setPathMatcher(this.pathMatcher);
248                        handler.setCacheLimit(this.cacheLimit);
249                        handler.setPreservePublishOrder(this.preservePublishOrder);
250                        return handler;
251                }
252                return null;
253        }
254
255        @Nullable
256        protected StompBrokerRelayMessageHandler getStompBrokerRelay(SubscribableChannel brokerChannel) {
257                if (this.brokerRelayRegistration != null) {
258                        StompBrokerRelayMessageHandler relay = this.brokerRelayRegistration.getMessageHandler(brokerChannel);
259                        relay.setPreservePublishOrder(this.preservePublishOrder);
260                        return relay;
261                }
262                return null;
263        }
264
265}