001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.config; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import org.springframework.beans.BeanUtils; 026import org.springframework.beans.factory.BeanInitializationException; 027import org.springframework.context.ApplicationContext; 028import org.springframework.context.ApplicationContextAware; 029import org.springframework.context.annotation.Bean; 030import org.springframework.messaging.Message; 031import org.springframework.messaging.MessageHandler; 032import org.springframework.messaging.converter.ByteArrayMessageConverter; 033import org.springframework.messaging.converter.CompositeMessageConverter; 034import org.springframework.messaging.converter.DefaultContentTypeResolver; 035import org.springframework.messaging.converter.MappingJackson2MessageConverter; 036import org.springframework.messaging.converter.MessageConverter; 037import org.springframework.messaging.converter.StringMessageConverter; 038import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; 039import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler; 040import org.springframework.messaging.simp.SimpMessagingTemplate; 041import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler; 042import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler; 043import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; 044import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; 045import org.springframework.messaging.simp.user.DefaultUserDestinationResolver; 046import org.springframework.messaging.simp.user.MultiServerUserRegistry; 047import org.springframework.messaging.simp.user.SimpUserRegistry; 048import org.springframework.messaging.simp.user.UserDestinationMessageHandler; 049import org.springframework.messaging.simp.user.UserDestinationResolver; 050import org.springframework.messaging.simp.user.UserRegistryMessageHandler; 051import org.springframework.messaging.support.AbstractSubscribableChannel; 052import org.springframework.messaging.support.ExecutorSubscribableChannel; 053import org.springframework.messaging.support.ImmutableMessageChannelInterceptor; 054import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 055import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 056import org.springframework.util.Assert; 057import org.springframework.util.ClassUtils; 058import org.springframework.util.MimeTypeUtils; 059import org.springframework.util.PathMatcher; 060import org.springframework.validation.Errors; 061import org.springframework.validation.Validator; 062 063/** 064 * Provides essential configuration for handling messages with simple messaging 065 * protocols such as STOMP. 066 * 067 * <p>{@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver 068 * messages to and from remote clients to several message handlers such as the 069 * following. 070 * <ul> 071 * <li>{@link #simpAnnotationMethodMessageHandler()}</li> 072 * <li>{@link #simpleBrokerMessageHandler()}</li> 073 * <li>{@link #stompBrokerRelayMessageHandler()}</li> 074 * <li>{@link #userDestinationMessageHandler()}</li> 075 * </ul> 076 * 077 * <p>{@link #brokerChannel()} delivers messages from within the application to the 078 * the respective message handlers. {@link #brokerMessagingTemplate()} can be injected 079 * into any application component to send messages. 080 * 081 * <p>Subclasses are responsible for the parts of the configuration that feed messages 082 * to and from the client inbound/outbound channels (e.g. STOMP over WebSocket). 083 * 084 * @author Rossen Stoyanchev 085 * @author Brian Clozel 086 * @since 4.0 087 */ 088public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware { 089 090 private static final String MVC_VALIDATOR_NAME = "mvcValidator"; 091 092 private static final boolean jackson2Present = ClassUtils.isPresent( 093 "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader()); 094 095 096 private ApplicationContext applicationContext; 097 098 private ChannelRegistration clientInboundChannelRegistration; 099 100 private ChannelRegistration clientOutboundChannelRegistration; 101 102 private MessageBrokerRegistry brokerRegistry; 103 104 105 /** 106 * Protected constructor. 107 */ 108 protected AbstractMessageBrokerConfiguration() { 109 } 110 111 112 @Override 113 public void setApplicationContext(ApplicationContext applicationContext) { 114 this.applicationContext = applicationContext; 115 } 116 117 public ApplicationContext getApplicationContext() { 118 return this.applicationContext; 119 } 120 121 122 @Bean 123 public AbstractSubscribableChannel clientInboundChannel() { 124 ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); 125 ChannelRegistration reg = getClientInboundChannelRegistration(); 126 if (reg.hasInterceptors()) { 127 channel.setInterceptors(reg.getInterceptors()); 128 } 129 return channel; 130 } 131 132 @Bean 133 public ThreadPoolTaskExecutor clientInboundChannelExecutor() { 134 TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor(); 135 ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); 136 executor.setThreadNamePrefix("clientInboundChannel-"); 137 return executor; 138 } 139 140 protected final ChannelRegistration getClientInboundChannelRegistration() { 141 if (this.clientInboundChannelRegistration == null) { 142 ChannelRegistration registration = new ChannelRegistration(); 143 configureClientInboundChannel(registration); 144 registration.interceptors(new ImmutableMessageChannelInterceptor()); 145 this.clientInboundChannelRegistration = registration; 146 } 147 return this.clientInboundChannelRegistration; 148 } 149 150 /** 151 * A hook for subclasses to customize the message channel for inbound messages 152 * from WebSocket clients. 153 */ 154 protected void configureClientInboundChannel(ChannelRegistration registration) { 155 } 156 157 @Bean 158 public AbstractSubscribableChannel clientOutboundChannel() { 159 ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); 160 ChannelRegistration reg = getClientOutboundChannelRegistration(); 161 if (reg.hasInterceptors()) { 162 channel.setInterceptors(reg.getInterceptors()); 163 } 164 return channel; 165 } 166 167 @Bean 168 public ThreadPoolTaskExecutor clientOutboundChannelExecutor() { 169 TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor(); 170 ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); 171 executor.setThreadNamePrefix("clientOutboundChannel-"); 172 return executor; 173 } 174 175 protected final ChannelRegistration getClientOutboundChannelRegistration() { 176 if (this.clientOutboundChannelRegistration == null) { 177 ChannelRegistration registration = new ChannelRegistration(); 178 configureClientOutboundChannel(registration); 179 registration.interceptors(new ImmutableMessageChannelInterceptor()); 180 this.clientOutboundChannelRegistration = registration; 181 } 182 return this.clientOutboundChannelRegistration; 183 } 184 185 /** 186 * A hook for subclasses to customize the message channel for messages from 187 * the application or message broker to WebSocket clients. 188 */ 189 protected void configureClientOutboundChannel(ChannelRegistration registration) { 190 } 191 192 @Bean 193 public AbstractSubscribableChannel brokerChannel() { 194 ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); 195 ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ? 196 new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel()); 197 reg.interceptors(new ImmutableMessageChannelInterceptor()); 198 channel.setInterceptors(reg.getInterceptors()); 199 return channel; 200 } 201 202 @Bean 203 public ThreadPoolTaskExecutor brokerChannelExecutor() { 204 ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); 205 ThreadPoolTaskExecutor executor; 206 if (reg.hasTaskExecutor()) { 207 executor = reg.taskExecutor().getTaskExecutor(); 208 } 209 else { 210 // Should never be used 211 executor = new ThreadPoolTaskExecutor(); 212 executor.setCorePoolSize(0); 213 executor.setMaxPoolSize(1); 214 executor.setQueueCapacity(0); 215 } 216 executor.setThreadNamePrefix("brokerChannel-"); 217 return executor; 218 } 219 220 /** 221 * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation 222 * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}. 223 */ 224 protected final MessageBrokerRegistry getBrokerRegistry() { 225 if (this.brokerRegistry == null) { 226 MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel()); 227 configureMessageBroker(registry); 228 this.brokerRegistry = registry; 229 } 230 return this.brokerRegistry; 231 } 232 233 /** 234 * A hook for subclasses to customize message broker configuration through the 235 * provided {@link MessageBrokerRegistry} instance. 236 */ 237 protected void configureMessageBroker(MessageBrokerRegistry registry) { 238 } 239 240 /** 241 * Provide access to the configured PatchMatcher for access from other 242 * configuration classes. 243 */ 244 public final PathMatcher getPathMatcher() { 245 return getBrokerRegistry().getPathMatcher(); 246 } 247 248 @Bean 249 public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() { 250 SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler(); 251 handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes()); 252 handler.setMessageConverter(brokerMessageConverter()); 253 handler.setValidator(simpValidator()); 254 255 List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<HandlerMethodArgumentResolver>(); 256 addArgumentResolvers(argumentResolvers); 257 handler.setCustomArgumentResolvers(argumentResolvers); 258 259 List<HandlerMethodReturnValueHandler> returnValueHandlers = new ArrayList<HandlerMethodReturnValueHandler>(); 260 addReturnValueHandlers(returnValueHandlers); 261 handler.setCustomReturnValueHandlers(returnValueHandlers); 262 263 PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher(); 264 if (pathMatcher != null) { 265 handler.setPathMatcher(pathMatcher); 266 } 267 return handler; 268 } 269 270 /** 271 * Protected method for plugging in a custom subclass of 272 * {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler 273 * SimpAnnotationMethodMessageHandler}. 274 * @since 4.2 275 */ 276 protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() { 277 return new SimpAnnotationMethodMessageHandler(clientInboundChannel(), 278 clientOutboundChannel(), brokerMessagingTemplate()); 279 } 280 281 protected void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) { 282 } 283 284 protected void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) { 285 } 286 287 @Bean 288 public AbstractBrokerMessageHandler simpleBrokerMessageHandler() { 289 SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel()); 290 if (handler == null) { 291 return new NoOpBrokerMessageHandler(); 292 } 293 updateUserDestinationResolver(handler); 294 return handler; 295 } 296 297 private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler) { 298 Collection<String> prefixes = handler.getDestinationPrefixes(); 299 if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) { 300 ((DefaultUserDestinationResolver) userDestinationResolver()).setRemoveLeadingSlash(true); 301 } 302 } 303 304 @Bean 305 public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { 306 StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel()); 307 if (handler == null) { 308 return new NoOpBrokerMessageHandler(); 309 } 310 Map<String, MessageHandler> subscriptions = new HashMap<String, MessageHandler>(1); 311 String destination = getBrokerRegistry().getUserDestinationBroadcast(); 312 if (destination != null) { 313 subscriptions.put(destination, userDestinationMessageHandler()); 314 } 315 destination = getBrokerRegistry().getUserRegistryBroadcast(); 316 if (destination != null) { 317 subscriptions.put(destination, userRegistryMessageHandler()); 318 } 319 handler.setSystemSubscriptions(subscriptions); 320 updateUserDestinationResolver(handler); 321 return handler; 322 } 323 324 @Bean 325 public UserDestinationMessageHandler userDestinationMessageHandler() { 326 UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(), 327 brokerChannel(), userDestinationResolver()); 328 String destination = getBrokerRegistry().getUserDestinationBroadcast(); 329 handler.setBroadcastDestination(destination); 330 return handler; 331 } 332 333 @Bean 334 public MessageHandler userRegistryMessageHandler() { 335 if (getBrokerRegistry().getUserRegistryBroadcast() == null) { 336 return new NoOpMessageHandler(); 337 } 338 SimpUserRegistry userRegistry = userRegistry(); 339 Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); 340 return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, 341 brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(), 342 messageBrokerTaskScheduler()); 343 } 344 345 // Expose alias for 4.1 compatibility 346 @Bean(name = {"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"}) 347 public ThreadPoolTaskScheduler messageBrokerTaskScheduler() { 348 ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); 349 scheduler.setThreadNamePrefix("MessageBroker-"); 350 scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); 351 scheduler.setRemoveOnCancelPolicy(true); 352 return scheduler; 353 } 354 355 @Bean 356 public SimpMessagingTemplate brokerMessagingTemplate() { 357 SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel()); 358 String prefix = getBrokerRegistry().getUserDestinationPrefix(); 359 if (prefix != null) { 360 template.setUserDestinationPrefix(prefix); 361 } 362 template.setMessageConverter(brokerMessageConverter()); 363 return template; 364 } 365 366 @Bean 367 public CompositeMessageConverter brokerMessageConverter() { 368 List<MessageConverter> converters = new ArrayList<MessageConverter>(); 369 boolean registerDefaults = configureMessageConverters(converters); 370 if (registerDefaults) { 371 converters.add(new StringMessageConverter()); 372 converters.add(new ByteArrayMessageConverter()); 373 if (jackson2Present) { 374 converters.add(createJacksonConverter()); 375 } 376 } 377 return new CompositeMessageConverter(converters); 378 } 379 380 protected MappingJackson2MessageConverter createJacksonConverter() { 381 DefaultContentTypeResolver resolver = new DefaultContentTypeResolver(); 382 resolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON); 383 MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 384 converter.setContentTypeResolver(resolver); 385 return converter; 386 } 387 388 /** 389 * Override this method to add custom message converters. 390 * @param messageConverters the list to add converters to, initially empty 391 * @return {@code true} if default message converters should be added to list, 392 * {@code false} if no more converters should be added 393 */ 394 protected boolean configureMessageConverters(List<MessageConverter> messageConverters) { 395 return true; 396 } 397 398 @Bean 399 public UserDestinationResolver userDestinationResolver() { 400 DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry()); 401 String prefix = getBrokerRegistry().getUserDestinationPrefix(); 402 if (prefix != null) { 403 resolver.setUserDestinationPrefix(prefix); 404 } 405 return resolver; 406 } 407 408 @Bean 409 public SimpUserRegistry userRegistry() { 410 return (getBrokerRegistry().getUserRegistryBroadcast() != null ? 411 new MultiServerUserRegistry(createLocalUserRegistry()) : createLocalUserRegistry()); 412 } 413 414 /** 415 * Create the user registry that provides access to local users. 416 */ 417 protected abstract SimpUserRegistry createLocalUserRegistry(); 418 419 /** 420 * As of 4.2, {@code UserSessionRegistry} is deprecated in favor of {@link SimpUserRegistry} 421 * exposing information about all connected users. The {@link MultiServerUserRegistry} 422 * implementation in combination with {@link UserRegistryMessageHandler} can be used 423 * to share user registries across multiple servers. 424 */ 425 @Deprecated 426 @SuppressWarnings("deprecation") 427 protected org.springframework.messaging.simp.user.UserSessionRegistry userSessionRegistry() { 428 return null; 429 } 430 431 /** 432 * Return a {@link org.springframework.validation.Validator}s instance for validating 433 * {@code @Payload} method arguments. 434 * <p>In order, this method tries to get a Validator instance: 435 * <ul> 436 * <li>delegating to getValidator() first</li> 437 * <li>if none returned, getting an existing instance with its well-known name "mvcValidator", 438 * created by an MVC configuration</li> 439 * <li>if none returned, checking the classpath for the presence of a JSR-303 implementation 440 * before creating a {@code OptionalValidatorFactoryBean}</li> 441 * <li>returning a no-op Validator instance</li> 442 * </ul> 443 */ 444 protected Validator simpValidator() { 445 Validator validator = getValidator(); 446 if (validator == null) { 447 if (this.applicationContext.containsBean(MVC_VALIDATOR_NAME)) { 448 validator = this.applicationContext.getBean(MVC_VALIDATOR_NAME, Validator.class); 449 } 450 else if (ClassUtils.isPresent("javax.validation.Validator", getClass().getClassLoader())) { 451 Class<?> clazz; 452 try { 453 String className = "org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean"; 454 clazz = ClassUtils.forName(className, AbstractMessageBrokerConfiguration.class.getClassLoader()); 455 } 456 catch (Throwable ex) { 457 throw new BeanInitializationException("Could not find default validator class", ex); 458 } 459 validator = (Validator) BeanUtils.instantiateClass(clazz); 460 } 461 else { 462 validator = new Validator() { 463 @Override 464 public boolean supports(Class<?> clazz) { 465 return false; 466 } 467 @Override 468 public void validate(Object target, Errors errors) { 469 } 470 }; 471 } 472 } 473 return validator; 474 } 475 476 /** 477 * Override this method to provide a custom {@link Validator}. 478 * @since 4.0.1 479 */ 480 public Validator getValidator() { 481 return null; 482 } 483 484 485 private static class NoOpMessageHandler implements MessageHandler { 486 487 @Override 488 public void handleMessage(Message<?> message) { 489 } 490 } 491 492 493 private class NoOpBrokerMessageHandler extends AbstractBrokerMessageHandler { 494 495 public NoOpBrokerMessageHandler() { 496 super(clientInboundChannel(), clientOutboundChannel(), brokerChannel()); 497 } 498 499 @Override 500 public void start() { 501 } 502 503 @Override 504 public void stop() { 505 } 506 507 @Override 508 public void handleMessage(Message<?> message) { 509 } 510 511 @Override 512 protected void handleMessageInternal(Message<?> message) { 513 } 514 } 515 516}