001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.config;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.springframework.beans.BeanUtils;
026import org.springframework.beans.factory.BeanInitializationException;
027import org.springframework.context.ApplicationContext;
028import org.springframework.context.ApplicationContextAware;
029import org.springframework.context.annotation.Bean;
030import org.springframework.context.event.SmartApplicationListener;
031import org.springframework.core.task.TaskExecutor;
032import org.springframework.lang.Nullable;
033import org.springframework.messaging.MessageHandler;
034import org.springframework.messaging.converter.ByteArrayMessageConverter;
035import org.springframework.messaging.converter.CompositeMessageConverter;
036import org.springframework.messaging.converter.DefaultContentTypeResolver;
037import org.springframework.messaging.converter.MappingJackson2MessageConverter;
038import org.springframework.messaging.converter.MessageConverter;
039import org.springframework.messaging.converter.StringMessageConverter;
040import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
041import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
042import org.springframework.messaging.simp.SimpLogging;
043import org.springframework.messaging.simp.SimpMessagingTemplate;
044import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
045import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
046import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
047import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
048import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
049import org.springframework.messaging.simp.user.MultiServerUserRegistry;
050import org.springframework.messaging.simp.user.SimpUserRegistry;
051import org.springframework.messaging.simp.user.UserDestinationMessageHandler;
052import org.springframework.messaging.simp.user.UserDestinationResolver;
053import org.springframework.messaging.simp.user.UserRegistryMessageHandler;
054import org.springframework.messaging.support.AbstractSubscribableChannel;
055import org.springframework.messaging.support.ExecutorSubscribableChannel;
056import org.springframework.messaging.support.ImmutableMessageChannelInterceptor;
057import org.springframework.scheduling.TaskScheduler;
058import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
059import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
060import org.springframework.util.Assert;
061import org.springframework.util.ClassUtils;
062import org.springframework.util.MimeTypeUtils;
063import org.springframework.util.PathMatcher;
064import org.springframework.validation.Errors;
065import org.springframework.validation.Validator;
066
067/**
068 * Provides essential configuration for handling messages with simple messaging
069 * protocols such as STOMP.
070 *
071 * <p>{@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver
072 * messages to and from remote clients to several message handlers such as the
073 * following.
074 * <ul>
075 * <li>{@link #simpAnnotationMethodMessageHandler()}</li>
076 * <li>{@link #simpleBrokerMessageHandler()}</li>
077 * <li>{@link #stompBrokerRelayMessageHandler()}</li>
078 * <li>{@link #userDestinationMessageHandler()}</li>
079 * </ul>
080 *
081 * <p>{@link #brokerChannel()} delivers messages from within the application to the
082 * the respective message handlers. {@link #brokerMessagingTemplate()} can be injected
083 * into any application component to send messages.
084 *
085 * <p>Subclasses are responsible for the parts of the configuration that feed messages
086 * to and from the client inbound/outbound channels (e.g. STOMP over WebSocket).
087 *
088 * @author Rossen Stoyanchev
089 * @author Brian Clozel
090 * @since 4.0
091 */
092public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
093
094        private static final String MVC_VALIDATOR_NAME = "mvcValidator";
095
096        private static final boolean jackson2Present = ClassUtils.isPresent(
097                        "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader());
098
099
100        @Nullable
101        private ApplicationContext applicationContext;
102
103        @Nullable
104        private ChannelRegistration clientInboundChannelRegistration;
105
106        @Nullable
107        private ChannelRegistration clientOutboundChannelRegistration;
108
109        @Nullable
110        private MessageBrokerRegistry brokerRegistry;
111
112
113        /**
114         * Protected constructor.
115         */
116        protected AbstractMessageBrokerConfiguration() {
117        }
118
119
120        @Override
121        public void setApplicationContext(@Nullable ApplicationContext applicationContext) {
122                this.applicationContext = applicationContext;
123        }
124
125        @Nullable
126        public ApplicationContext getApplicationContext() {
127                return this.applicationContext;
128        }
129
130
131        @Bean
132        public AbstractSubscribableChannel clientInboundChannel() {
133                ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
134                channel.setLogger(SimpLogging.forLog(channel.getLogger()));
135                ChannelRegistration reg = getClientInboundChannelRegistration();
136                if (reg.hasInterceptors()) {
137                        channel.setInterceptors(reg.getInterceptors());
138                }
139                return channel;
140        }
141
142        @Bean
143        public TaskExecutor clientInboundChannelExecutor() {
144                TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor();
145                ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
146                executor.setThreadNamePrefix("clientInboundChannel-");
147                return executor;
148        }
149
150        protected final ChannelRegistration getClientInboundChannelRegistration() {
151                if (this.clientInboundChannelRegistration == null) {
152                        ChannelRegistration registration = new ChannelRegistration();
153                        configureClientInboundChannel(registration);
154                        registration.interceptors(new ImmutableMessageChannelInterceptor());
155                        this.clientInboundChannelRegistration = registration;
156                }
157                return this.clientInboundChannelRegistration;
158        }
159
160        /**
161         * A hook for subclasses to customize the message channel for inbound messages
162         * from WebSocket clients.
163         */
164        protected void configureClientInboundChannel(ChannelRegistration registration) {
165        }
166
167        @Bean
168        public AbstractSubscribableChannel clientOutboundChannel() {
169                ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
170                channel.setLogger(SimpLogging.forLog(channel.getLogger()));
171                ChannelRegistration reg = getClientOutboundChannelRegistration();
172                if (reg.hasInterceptors()) {
173                        channel.setInterceptors(reg.getInterceptors());
174                }
175                return channel;
176        }
177
178        @Bean
179        public TaskExecutor clientOutboundChannelExecutor() {
180                TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor();
181                ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
182                executor.setThreadNamePrefix("clientOutboundChannel-");
183                return executor;
184        }
185
186        protected final ChannelRegistration getClientOutboundChannelRegistration() {
187                if (this.clientOutboundChannelRegistration == null) {
188                        ChannelRegistration registration = new ChannelRegistration();
189                        configureClientOutboundChannel(registration);
190                        registration.interceptors(new ImmutableMessageChannelInterceptor());
191                        this.clientOutboundChannelRegistration = registration;
192                }
193                return this.clientOutboundChannelRegistration;
194        }
195
196        /**
197         * A hook for subclasses to customize the message channel for messages from
198         * the application or message broker to WebSocket clients.
199         */
200        protected void configureClientOutboundChannel(ChannelRegistration registration) {
201        }
202
203        @Bean
204        public AbstractSubscribableChannel brokerChannel() {
205                ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
206                ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ?
207                                new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel());
208                reg.interceptors(new ImmutableMessageChannelInterceptor());
209                channel.setLogger(SimpLogging.forLog(channel.getLogger()));
210                channel.setInterceptors(reg.getInterceptors());
211                return channel;
212        }
213
214        @Bean
215        public TaskExecutor brokerChannelExecutor() {
216                ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
217                ThreadPoolTaskExecutor executor;
218                if (reg.hasTaskExecutor()) {
219                        executor = reg.taskExecutor().getTaskExecutor();
220                }
221                else {
222                        // Should never be used
223                        executor = new ThreadPoolTaskExecutor();
224                        executor.setCorePoolSize(0);
225                        executor.setMaxPoolSize(1);
226                        executor.setQueueCapacity(0);
227                }
228                executor.setThreadNamePrefix("brokerChannel-");
229                return executor;
230        }
231
232        /**
233         * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation
234         * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}.
235         */
236        protected final MessageBrokerRegistry getBrokerRegistry() {
237                if (this.brokerRegistry == null) {
238                        MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel());
239                        configureMessageBroker(registry);
240                        this.brokerRegistry = registry;
241                }
242                return this.brokerRegistry;
243        }
244
245        /**
246         * A hook for subclasses to customize message broker configuration through the
247         * provided {@link MessageBrokerRegistry} instance.
248         */
249        protected void configureMessageBroker(MessageBrokerRegistry registry) {
250        }
251
252        /**
253         * Provide access to the configured PatchMatcher for access from other
254         * configuration classes.
255         */
256        @Nullable
257        public final PathMatcher getPathMatcher() {
258                return getBrokerRegistry().getPathMatcher();
259        }
260
261        @Bean
262        public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
263                SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler();
264                handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes());
265                handler.setMessageConverter(brokerMessageConverter());
266                handler.setValidator(simpValidator());
267
268                List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
269                addArgumentResolvers(argumentResolvers);
270                handler.setCustomArgumentResolvers(argumentResolvers);
271
272                List<HandlerMethodReturnValueHandler> returnValueHandlers = new ArrayList<>();
273                addReturnValueHandlers(returnValueHandlers);
274                handler.setCustomReturnValueHandlers(returnValueHandlers);
275
276                PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher();
277                if (pathMatcher != null) {
278                        handler.setPathMatcher(pathMatcher);
279                }
280                return handler;
281        }
282
283        /**
284         * Protected method for plugging in a custom subclass of
285         * {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler
286         * SimpAnnotationMethodMessageHandler}.
287         * @since 4.2
288         */
289        protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
290                return new SimpAnnotationMethodMessageHandler(clientInboundChannel(),
291                                clientOutboundChannel(), brokerMessagingTemplate());
292        }
293
294        protected void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
295        }
296
297        protected void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
298        }
299
300        @Bean
301        @Nullable
302        public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
303                SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel());
304                if (handler == null) {
305                        return null;
306                }
307                updateUserDestinationResolver(handler);
308                return handler;
309        }
310
311        private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler) {
312                Collection<String> prefixes = handler.getDestinationPrefixes();
313                if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) {
314                        ((DefaultUserDestinationResolver) userDestinationResolver()).setRemoveLeadingSlash(true);
315                }
316        }
317
318        @Bean
319        @Nullable
320        public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
321                StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
322                if (handler == null) {
323                        return null;
324                }
325                Map<String, MessageHandler> subscriptions = new HashMap<>(4);
326                String destination = getBrokerRegistry().getUserDestinationBroadcast();
327                if (destination != null) {
328                        subscriptions.put(destination, userDestinationMessageHandler());
329                }
330                destination = getBrokerRegistry().getUserRegistryBroadcast();
331                if (destination != null) {
332                        subscriptions.put(destination, userRegistryMessageHandler());
333                }
334                handler.setSystemSubscriptions(subscriptions);
335                updateUserDestinationResolver(handler);
336                return handler;
337        }
338
339        @Bean
340        public UserDestinationMessageHandler userDestinationMessageHandler() {
341                UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(),
342                                brokerChannel(), userDestinationResolver());
343                String destination = getBrokerRegistry().getUserDestinationBroadcast();
344                if (destination != null) {
345                        handler.setBroadcastDestination(destination);
346                }
347                return handler;
348        }
349
350        @Bean
351        @Nullable
352        public MessageHandler userRegistryMessageHandler() {
353                if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
354                        return null;
355                }
356                SimpUserRegistry userRegistry = userRegistry();
357                Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
358                return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
359                                brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
360                                messageBrokerTaskScheduler());
361        }
362
363        // Expose alias for 4.1 compatibility
364        @Bean(name = {"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"})
365        public TaskScheduler messageBrokerTaskScheduler() {
366                ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
367                scheduler.setThreadNamePrefix("MessageBroker-");
368                scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
369                scheduler.setRemoveOnCancelPolicy(true);
370                return scheduler;
371        }
372
373        @Bean
374        public SimpMessagingTemplate brokerMessagingTemplate() {
375                SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
376                String prefix = getBrokerRegistry().getUserDestinationPrefix();
377                if (prefix != null) {
378                        template.setUserDestinationPrefix(prefix);
379                }
380                template.setMessageConverter(brokerMessageConverter());
381                return template;
382        }
383
384        @Bean
385        public CompositeMessageConverter brokerMessageConverter() {
386                List<MessageConverter> converters = new ArrayList<>();
387                boolean registerDefaults = configureMessageConverters(converters);
388                if (registerDefaults) {
389                        converters.add(new StringMessageConverter());
390                        converters.add(new ByteArrayMessageConverter());
391                        if (jackson2Present) {
392                                converters.add(createJacksonConverter());
393                        }
394                }
395                return new CompositeMessageConverter(converters);
396        }
397
398        protected MappingJackson2MessageConverter createJacksonConverter() {
399                DefaultContentTypeResolver resolver = new DefaultContentTypeResolver();
400                resolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON);
401                MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
402                converter.setContentTypeResolver(resolver);
403                return converter;
404        }
405
406        /**
407         * Override this method to add custom message converters.
408         * @param messageConverters the list to add converters to, initially empty
409         * @return {@code true} if default message converters should be added to list,
410         * {@code false} if no more converters should be added
411         */
412        protected boolean configureMessageConverters(List<MessageConverter> messageConverters) {
413                return true;
414        }
415
416        @Bean
417        public UserDestinationResolver userDestinationResolver() {
418                DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry());
419                String prefix = getBrokerRegistry().getUserDestinationPrefix();
420                if (prefix != null) {
421                        resolver.setUserDestinationPrefix(prefix);
422                }
423                return resolver;
424        }
425
426        @Bean
427        @SuppressWarnings("deprecation")
428        public SimpUserRegistry userRegistry() {
429                SimpUserRegistry registry = createLocalUserRegistry();
430                if (registry == null) {
431                        registry = createLocalUserRegistry(getBrokerRegistry().getUserRegistryOrder());
432                }
433                boolean broadcast = getBrokerRegistry().getUserRegistryBroadcast() != null;
434                return (broadcast ? new MultiServerUserRegistry(registry) : registry);
435        }
436
437        /**
438         * Create the user registry that provides access to local users.
439         * @deprecated as of 5.1 in favor of {@link #createLocalUserRegistry(Integer)}
440         */
441        @Deprecated
442        @Nullable
443        protected SimpUserRegistry createLocalUserRegistry() {
444                return null;
445        }
446
447        /**
448         * Create the user registry that provides access to local users.
449         * @param order the order to use as a {@link SmartApplicationListener}.
450         * @since 5.1
451         */
452        protected abstract SimpUserRegistry createLocalUserRegistry(@Nullable Integer order);
453
454        /**
455         * Return an {@link org.springframework.validation.Validator} instance for
456         * validating {@code @Payload} method arguments.
457         * <p>In order, this method tries to get a Validator instance:
458         * <ul>
459         * <li>delegating to getValidator() first</li>
460         * <li>if none returned, getting an existing instance with its well-known name "mvcValidator",
461         * created by an MVC configuration</li>
462         * <li>if none returned, checking the classpath for the presence of a JSR-303 implementation
463         * before creating a {@code OptionalValidatorFactoryBean}</li>
464         * <li>returning a no-op Validator instance</li>
465         * </ul>
466         */
467        protected Validator simpValidator() {
468                Validator validator = getValidator();
469                if (validator == null) {
470                        if (this.applicationContext != null && this.applicationContext.containsBean(MVC_VALIDATOR_NAME)) {
471                                validator = this.applicationContext.getBean(MVC_VALIDATOR_NAME, Validator.class);
472                        }
473                        else if (ClassUtils.isPresent("javax.validation.Validator", getClass().getClassLoader())) {
474                                Class<?> clazz;
475                                try {
476                                        String className = "org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean";
477                                        clazz = ClassUtils.forName(className, AbstractMessageBrokerConfiguration.class.getClassLoader());
478                                }
479                                catch (Throwable ex) {
480                                        throw new BeanInitializationException("Could not find default validator class", ex);
481                                }
482                                validator = (Validator) BeanUtils.instantiateClass(clazz);
483                        }
484                        else {
485                                validator = new Validator() {
486                                        @Override
487                                        public boolean supports(Class<?> clazz) {
488                                                return false;
489                                        }
490                                        @Override
491                                        public void validate(@Nullable Object target, Errors errors) {
492                                        }
493                                };
494                        }
495                }
496                return validator;
497        }
498
499        /**
500         * Override this method to provide a custom {@link Validator}.
501         * @since 4.0.1
502         */
503        @Nullable
504        public Validator getValidator() {
505                return null;
506        }
507
508}