001/*
002 * Copyright 2002-2019 the original author or authors.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 *      https://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package org.springframework.messaging.simp.config;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import org.springframework.beans.BeanUtils;
026import org.springframework.beans.factory.BeanInitializationException;
027import org.springframework.context.ApplicationContext;
028import org.springframework.context.ApplicationContextAware;
029import org.springframework.context.annotation.Bean;
030import org.springframework.messaging.Message;
031import org.springframework.messaging.MessageHandler;
032import org.springframework.messaging.converter.ByteArrayMessageConverter;
033import org.springframework.messaging.converter.CompositeMessageConverter;
034import org.springframework.messaging.converter.DefaultContentTypeResolver;
035import org.springframework.messaging.converter.MappingJackson2MessageConverter;
036import org.springframework.messaging.converter.MessageConverter;
037import org.springframework.messaging.converter.StringMessageConverter;
038import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
039import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
040import org.springframework.messaging.simp.SimpMessagingTemplate;
041import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
042import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
043import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
044import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
045import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
046import org.springframework.messaging.simp.user.MultiServerUserRegistry;
047import org.springframework.messaging.simp.user.SimpUserRegistry;
048import org.springframework.messaging.simp.user.UserDestinationMessageHandler;
049import org.springframework.messaging.simp.user.UserDestinationResolver;
050import org.springframework.messaging.simp.user.UserRegistryMessageHandler;
051import org.springframework.messaging.support.AbstractSubscribableChannel;
052import org.springframework.messaging.support.ExecutorSubscribableChannel;
053import org.springframework.messaging.support.ImmutableMessageChannelInterceptor;
054import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
055import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
056import org.springframework.util.Assert;
057import org.springframework.util.ClassUtils;
058import org.springframework.util.MimeTypeUtils;
059import org.springframework.util.PathMatcher;
060import org.springframework.validation.Errors;
061import org.springframework.validation.Validator;
062
063/**
064 * Provides essential configuration for handling messages with simple messaging
065 * protocols such as STOMP.
066 *
067 * <p>{@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver
068 * messages to and from remote clients to several message handlers such as the
069 * following.
070 * <ul>
071 * <li>{@link #simpAnnotationMethodMessageHandler()}</li>
072 * <li>{@link #simpleBrokerMessageHandler()}</li>
073 * <li>{@link #stompBrokerRelayMessageHandler()}</li>
074 * <li>{@link #userDestinationMessageHandler()}</li>
075 * </ul>
076 *
077 * <p>{@link #brokerChannel()} delivers messages from within the application to the
078 * the respective message handlers. {@link #brokerMessagingTemplate()} can be injected
079 * into any application component to send messages.
080 *
081 * <p>Subclasses are responsible for the parts of the configuration that feed messages
082 * to and from the client inbound/outbound channels (e.g. STOMP over WebSocket).
083 *
084 * @author Rossen Stoyanchev
085 * @author Brian Clozel
086 * @since 4.0
087 */
088public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
089
090        private static final String MVC_VALIDATOR_NAME = "mvcValidator";
091
092        private static final boolean jackson2Present = ClassUtils.isPresent(
093                        "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader());
094
095
096        private ApplicationContext applicationContext;
097
098        private ChannelRegistration clientInboundChannelRegistration;
099
100        private ChannelRegistration clientOutboundChannelRegistration;
101
102        private MessageBrokerRegistry brokerRegistry;
103
104
105        /**
106         * Protected constructor.
107         */
108        protected AbstractMessageBrokerConfiguration() {
109        }
110
111
112        @Override
113        public void setApplicationContext(ApplicationContext applicationContext) {
114                this.applicationContext = applicationContext;
115        }
116
117        public ApplicationContext getApplicationContext() {
118                return this.applicationContext;
119        }
120
121
122        @Bean
123        public AbstractSubscribableChannel clientInboundChannel() {
124                ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
125                ChannelRegistration reg = getClientInboundChannelRegistration();
126                if (reg.hasInterceptors()) {
127                        channel.setInterceptors(reg.getInterceptors());
128                }
129                return channel;
130        }
131
132        @Bean
133        public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
134                TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor();
135                ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
136                executor.setThreadNamePrefix("clientInboundChannel-");
137                return executor;
138        }
139
140        protected final ChannelRegistration getClientInboundChannelRegistration() {
141                if (this.clientInboundChannelRegistration == null) {
142                        ChannelRegistration registration = new ChannelRegistration();
143                        configureClientInboundChannel(registration);
144                        registration.interceptors(new ImmutableMessageChannelInterceptor());
145                        this.clientInboundChannelRegistration = registration;
146                }
147                return this.clientInboundChannelRegistration;
148        }
149
150        /**
151         * A hook for subclasses to customize the message channel for inbound messages
152         * from WebSocket clients.
153         */
154        protected void configureClientInboundChannel(ChannelRegistration registration) {
155        }
156
157        @Bean
158        public AbstractSubscribableChannel clientOutboundChannel() {
159                ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
160                ChannelRegistration reg = getClientOutboundChannelRegistration();
161                if (reg.hasInterceptors()) {
162                        channel.setInterceptors(reg.getInterceptors());
163                }
164                return channel;
165        }
166
167        @Bean
168        public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
169                TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor();
170                ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
171                executor.setThreadNamePrefix("clientOutboundChannel-");
172                return executor;
173        }
174
175        protected final ChannelRegistration getClientOutboundChannelRegistration() {
176                if (this.clientOutboundChannelRegistration == null) {
177                        ChannelRegistration registration = new ChannelRegistration();
178                        configureClientOutboundChannel(registration);
179                        registration.interceptors(new ImmutableMessageChannelInterceptor());
180                        this.clientOutboundChannelRegistration = registration;
181                }
182                return this.clientOutboundChannelRegistration;
183        }
184
185        /**
186         * A hook for subclasses to customize the message channel for messages from
187         * the application or message broker to WebSocket clients.
188         */
189        protected void configureClientOutboundChannel(ChannelRegistration registration) {
190        }
191
192        @Bean
193        public AbstractSubscribableChannel brokerChannel() {
194                ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
195                ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ?
196                                new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel());
197                reg.interceptors(new ImmutableMessageChannelInterceptor());
198                channel.setInterceptors(reg.getInterceptors());
199                return channel;
200        }
201
202        @Bean
203        public ThreadPoolTaskExecutor brokerChannelExecutor() {
204                ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
205                ThreadPoolTaskExecutor executor;
206                if (reg.hasTaskExecutor()) {
207                        executor = reg.taskExecutor().getTaskExecutor();
208                }
209                else {
210                        // Should never be used
211                        executor = new ThreadPoolTaskExecutor();
212                        executor.setCorePoolSize(0);
213                        executor.setMaxPoolSize(1);
214                        executor.setQueueCapacity(0);
215                }
216                executor.setThreadNamePrefix("brokerChannel-");
217                return executor;
218        }
219
220        /**
221         * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation
222         * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}.
223         */
224        protected final MessageBrokerRegistry getBrokerRegistry() {
225                if (this.brokerRegistry == null) {
226                        MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel());
227                        configureMessageBroker(registry);
228                        this.brokerRegistry = registry;
229                }
230                return this.brokerRegistry;
231        }
232
233        /**
234         * A hook for subclasses to customize message broker configuration through the
235         * provided {@link MessageBrokerRegistry} instance.
236         */
237        protected void configureMessageBroker(MessageBrokerRegistry registry) {
238        }
239
240        /**
241         * Provide access to the configured PatchMatcher for access from other
242         * configuration classes.
243         */
244        public final PathMatcher getPathMatcher() {
245                return getBrokerRegistry().getPathMatcher();
246        }
247
248        @Bean
249        public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() {
250                SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler();
251                handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes());
252                handler.setMessageConverter(brokerMessageConverter());
253                handler.setValidator(simpValidator());
254
255                List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<HandlerMethodArgumentResolver>();
256                addArgumentResolvers(argumentResolvers);
257                handler.setCustomArgumentResolvers(argumentResolvers);
258
259                List<HandlerMethodReturnValueHandler> returnValueHandlers = new ArrayList<HandlerMethodReturnValueHandler>();
260                addReturnValueHandlers(returnValueHandlers);
261                handler.setCustomReturnValueHandlers(returnValueHandlers);
262
263                PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher();
264                if (pathMatcher != null) {
265                        handler.setPathMatcher(pathMatcher);
266                }
267                return handler;
268        }
269
270        /**
271         * Protected method for plugging in a custom subclass of
272         * {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler
273         * SimpAnnotationMethodMessageHandler}.
274         * @since 4.2
275         */
276        protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() {
277                return new SimpAnnotationMethodMessageHandler(clientInboundChannel(),
278                                clientOutboundChannel(), brokerMessagingTemplate());
279        }
280
281        protected void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
282        }
283
284        protected void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
285        }
286
287        @Bean
288        public AbstractBrokerMessageHandler simpleBrokerMessageHandler() {
289                SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel());
290                if (handler == null) {
291                        return new NoOpBrokerMessageHandler();
292                }
293                updateUserDestinationResolver(handler);
294                return handler;
295        }
296
297        private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler) {
298                Collection<String> prefixes = handler.getDestinationPrefixes();
299                if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) {
300                        ((DefaultUserDestinationResolver) userDestinationResolver()).setRemoveLeadingSlash(true);
301                }
302        }
303
304        @Bean
305        public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
306                StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
307                if (handler == null) {
308                        return new NoOpBrokerMessageHandler();
309                }
310                Map<String, MessageHandler> subscriptions = new HashMap<String, MessageHandler>(1);
311                String destination = getBrokerRegistry().getUserDestinationBroadcast();
312                if (destination != null) {
313                        subscriptions.put(destination, userDestinationMessageHandler());
314                }
315                destination = getBrokerRegistry().getUserRegistryBroadcast();
316                if (destination != null) {
317                        subscriptions.put(destination, userRegistryMessageHandler());
318                }
319                handler.setSystemSubscriptions(subscriptions);
320                updateUserDestinationResolver(handler);
321                return handler;
322        }
323
324        @Bean
325        public UserDestinationMessageHandler userDestinationMessageHandler() {
326                UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(),
327                                brokerChannel(), userDestinationResolver());
328                String destination = getBrokerRegistry().getUserDestinationBroadcast();
329                handler.setBroadcastDestination(destination);
330                return handler;
331        }
332
333        @Bean
334        public MessageHandler userRegistryMessageHandler() {
335                if (getBrokerRegistry().getUserRegistryBroadcast() == null) {
336                        return new NoOpMessageHandler();
337                }
338                SimpUserRegistry userRegistry = userRegistry();
339                Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required");
340                return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry,
341                                brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(),
342                                messageBrokerTaskScheduler());
343        }
344
345        // Expose alias for 4.1 compatibility
346        @Bean(name = {"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"})
347        public ThreadPoolTaskScheduler messageBrokerTaskScheduler() {
348                ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
349                scheduler.setThreadNamePrefix("MessageBroker-");
350                scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
351                scheduler.setRemoveOnCancelPolicy(true);
352                return scheduler;
353        }
354
355        @Bean
356        public SimpMessagingTemplate brokerMessagingTemplate() {
357                SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel());
358                String prefix = getBrokerRegistry().getUserDestinationPrefix();
359                if (prefix != null) {
360                        template.setUserDestinationPrefix(prefix);
361                }
362                template.setMessageConverter(brokerMessageConverter());
363                return template;
364        }
365
366        @Bean
367        public CompositeMessageConverter brokerMessageConverter() {
368                List<MessageConverter> converters = new ArrayList<MessageConverter>();
369                boolean registerDefaults = configureMessageConverters(converters);
370                if (registerDefaults) {
371                        converters.add(new StringMessageConverter());
372                        converters.add(new ByteArrayMessageConverter());
373                        if (jackson2Present) {
374                                converters.add(createJacksonConverter());
375                        }
376                }
377                return new CompositeMessageConverter(converters);
378        }
379
380        protected MappingJackson2MessageConverter createJacksonConverter() {
381                DefaultContentTypeResolver resolver = new DefaultContentTypeResolver();
382                resolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON);
383                MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
384                converter.setContentTypeResolver(resolver);
385                return converter;
386        }
387
388        /**
389         * Override this method to add custom message converters.
390         * @param messageConverters the list to add converters to, initially empty
391         * @return {@code true} if default message converters should be added to list,
392         * {@code false} if no more converters should be added
393         */
394        protected boolean configureMessageConverters(List<MessageConverter> messageConverters) {
395                return true;
396        }
397
398        @Bean
399        public UserDestinationResolver userDestinationResolver() {
400                DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry());
401                String prefix = getBrokerRegistry().getUserDestinationPrefix();
402                if (prefix != null) {
403                        resolver.setUserDestinationPrefix(prefix);
404                }
405                return resolver;
406        }
407
408        @Bean
409        public SimpUserRegistry userRegistry() {
410                return (getBrokerRegistry().getUserRegistryBroadcast() != null ?
411                                new MultiServerUserRegistry(createLocalUserRegistry()) : createLocalUserRegistry());
412        }
413
414        /**
415         * Create the user registry that provides access to local users.
416         */
417        protected abstract SimpUserRegistry createLocalUserRegistry();
418
419        /**
420         * As of 4.2, {@code UserSessionRegistry} is deprecated in favor of {@link SimpUserRegistry}
421         * exposing information about all connected users. The {@link MultiServerUserRegistry}
422         * implementation in combination with {@link UserRegistryMessageHandler} can be used
423         * to share user registries across multiple servers.
424         */
425        @Deprecated
426        @SuppressWarnings("deprecation")
427        protected org.springframework.messaging.simp.user.UserSessionRegistry userSessionRegistry() {
428                return null;
429        }
430
431        /**
432         * Return a {@link org.springframework.validation.Validator}s instance for validating
433         * {@code @Payload} method arguments.
434         * <p>In order, this method tries to get a Validator instance:
435         * <ul>
436         * <li>delegating to getValidator() first</li>
437         * <li>if none returned, getting an existing instance with its well-known name "mvcValidator",
438         * created by an MVC configuration</li>
439         * <li>if none returned, checking the classpath for the presence of a JSR-303 implementation
440         * before creating a {@code OptionalValidatorFactoryBean}</li>
441         * <li>returning a no-op Validator instance</li>
442         * </ul>
443         */
444        protected Validator simpValidator() {
445                Validator validator = getValidator();
446                if (validator == null) {
447                        if (this.applicationContext.containsBean(MVC_VALIDATOR_NAME)) {
448                                validator = this.applicationContext.getBean(MVC_VALIDATOR_NAME, Validator.class);
449                        }
450                        else if (ClassUtils.isPresent("javax.validation.Validator", getClass().getClassLoader())) {
451                                Class<?> clazz;
452                                try {
453                                        String className = "org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean";
454                                        clazz = ClassUtils.forName(className, AbstractMessageBrokerConfiguration.class.getClassLoader());
455                                }
456                                catch (Throwable ex) {
457                                        throw new BeanInitializationException("Could not find default validator class", ex);
458                                }
459                                validator = (Validator) BeanUtils.instantiateClass(clazz);
460                        }
461                        else {
462                                validator = new Validator() {
463                                        @Override
464                                        public boolean supports(Class<?> clazz) {
465                                                return false;
466                                        }
467                                        @Override
468                                        public void validate(Object target, Errors errors) {
469                                        }
470                                };
471                        }
472                }
473                return validator;
474        }
475
476        /**
477         * Override this method to provide a custom {@link Validator}.
478         * @since 4.0.1
479         */
480        public Validator getValidator() {
481                return null;
482        }
483
484
485        private static class NoOpMessageHandler implements MessageHandler {
486
487                @Override
488                public void handleMessage(Message<?> message) {
489                }
490        }
491
492
493        private class NoOpBrokerMessageHandler extends AbstractBrokerMessageHandler {
494
495                public NoOpBrokerMessageHandler() {
496                        super(clientInboundChannel(), clientOutboundChannel(), brokerChannel());
497                }
498
499                @Override
500                public void start() {
501                }
502
503                @Override
504                public void stop() {
505                }
506
507                @Override
508                public void handleMessage(Message<?> message) {
509                }
510
511                @Override
512                protected void handleMessageInternal(Message<?> message) {
513                }
514        }
515
516}