001/* 002 * Copyright 2002-2019 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.config; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import org.springframework.beans.BeanUtils; 026import org.springframework.beans.factory.BeanInitializationException; 027import org.springframework.context.ApplicationContext; 028import org.springframework.context.ApplicationContextAware; 029import org.springframework.context.annotation.Bean; 030import org.springframework.context.event.SmartApplicationListener; 031import org.springframework.core.task.TaskExecutor; 032import org.springframework.lang.Nullable; 033import org.springframework.messaging.MessageHandler; 034import org.springframework.messaging.converter.ByteArrayMessageConverter; 035import org.springframework.messaging.converter.CompositeMessageConverter; 036import org.springframework.messaging.converter.DefaultContentTypeResolver; 037import org.springframework.messaging.converter.MappingJackson2MessageConverter; 038import org.springframework.messaging.converter.MessageConverter; 039import org.springframework.messaging.converter.StringMessageConverter; 040import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver; 041import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler; 042import org.springframework.messaging.simp.SimpLogging; 043import org.springframework.messaging.simp.SimpMessagingTemplate; 044import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler; 045import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler; 046import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler; 047import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler; 048import org.springframework.messaging.simp.user.DefaultUserDestinationResolver; 049import org.springframework.messaging.simp.user.MultiServerUserRegistry; 050import org.springframework.messaging.simp.user.SimpUserRegistry; 051import org.springframework.messaging.simp.user.UserDestinationMessageHandler; 052import org.springframework.messaging.simp.user.UserDestinationResolver; 053import org.springframework.messaging.simp.user.UserRegistryMessageHandler; 054import org.springframework.messaging.support.AbstractSubscribableChannel; 055import org.springframework.messaging.support.ExecutorSubscribableChannel; 056import org.springframework.messaging.support.ImmutableMessageChannelInterceptor; 057import org.springframework.scheduling.TaskScheduler; 058import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; 059import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; 060import org.springframework.util.Assert; 061import org.springframework.util.ClassUtils; 062import org.springframework.util.MimeTypeUtils; 063import org.springframework.util.PathMatcher; 064import org.springframework.validation.Errors; 065import org.springframework.validation.Validator; 066 067/** 068 * Provides essential configuration for handling messages with simple messaging 069 * protocols such as STOMP. 070 * 071 * <p>{@link #clientInboundChannel()} and {@link #clientOutboundChannel()} deliver 072 * messages to and from remote clients to several message handlers such as the 073 * following. 074 * <ul> 075 * <li>{@link #simpAnnotationMethodMessageHandler()}</li> 076 * <li>{@link #simpleBrokerMessageHandler()}</li> 077 * <li>{@link #stompBrokerRelayMessageHandler()}</li> 078 * <li>{@link #userDestinationMessageHandler()}</li> 079 * </ul> 080 * 081 * <p>{@link #brokerChannel()} delivers messages from within the application to the 082 * the respective message handlers. {@link #brokerMessagingTemplate()} can be injected 083 * into any application component to send messages. 084 * 085 * <p>Subclasses are responsible for the parts of the configuration that feed messages 086 * to and from the client inbound/outbound channels (e.g. STOMP over WebSocket). 087 * 088 * @author Rossen Stoyanchev 089 * @author Brian Clozel 090 * @since 4.0 091 */ 092public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware { 093 094 private static final String MVC_VALIDATOR_NAME = "mvcValidator"; 095 096 private static final boolean jackson2Present = ClassUtils.isPresent( 097 "com.fasterxml.jackson.databind.ObjectMapper", AbstractMessageBrokerConfiguration.class.getClassLoader()); 098 099 100 @Nullable 101 private ApplicationContext applicationContext; 102 103 @Nullable 104 private ChannelRegistration clientInboundChannelRegistration; 105 106 @Nullable 107 private ChannelRegistration clientOutboundChannelRegistration; 108 109 @Nullable 110 private MessageBrokerRegistry brokerRegistry; 111 112 113 /** 114 * Protected constructor. 115 */ 116 protected AbstractMessageBrokerConfiguration() { 117 } 118 119 120 @Override 121 public void setApplicationContext(@Nullable ApplicationContext applicationContext) { 122 this.applicationContext = applicationContext; 123 } 124 125 @Nullable 126 public ApplicationContext getApplicationContext() { 127 return this.applicationContext; 128 } 129 130 131 @Bean 132 public AbstractSubscribableChannel clientInboundChannel() { 133 ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor()); 134 channel.setLogger(SimpLogging.forLog(channel.getLogger())); 135 ChannelRegistration reg = getClientInboundChannelRegistration(); 136 if (reg.hasInterceptors()) { 137 channel.setInterceptors(reg.getInterceptors()); 138 } 139 return channel; 140 } 141 142 @Bean 143 public TaskExecutor clientInboundChannelExecutor() { 144 TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor(); 145 ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); 146 executor.setThreadNamePrefix("clientInboundChannel-"); 147 return executor; 148 } 149 150 protected final ChannelRegistration getClientInboundChannelRegistration() { 151 if (this.clientInboundChannelRegistration == null) { 152 ChannelRegistration registration = new ChannelRegistration(); 153 configureClientInboundChannel(registration); 154 registration.interceptors(new ImmutableMessageChannelInterceptor()); 155 this.clientInboundChannelRegistration = registration; 156 } 157 return this.clientInboundChannelRegistration; 158 } 159 160 /** 161 * A hook for subclasses to customize the message channel for inbound messages 162 * from WebSocket clients. 163 */ 164 protected void configureClientInboundChannel(ChannelRegistration registration) { 165 } 166 167 @Bean 168 public AbstractSubscribableChannel clientOutboundChannel() { 169 ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor()); 170 channel.setLogger(SimpLogging.forLog(channel.getLogger())); 171 ChannelRegistration reg = getClientOutboundChannelRegistration(); 172 if (reg.hasInterceptors()) { 173 channel.setInterceptors(reg.getInterceptors()); 174 } 175 return channel; 176 } 177 178 @Bean 179 public TaskExecutor clientOutboundChannelExecutor() { 180 TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor(); 181 ThreadPoolTaskExecutor executor = reg.getTaskExecutor(); 182 executor.setThreadNamePrefix("clientOutboundChannel-"); 183 return executor; 184 } 185 186 protected final ChannelRegistration getClientOutboundChannelRegistration() { 187 if (this.clientOutboundChannelRegistration == null) { 188 ChannelRegistration registration = new ChannelRegistration(); 189 configureClientOutboundChannel(registration); 190 registration.interceptors(new ImmutableMessageChannelInterceptor()); 191 this.clientOutboundChannelRegistration = registration; 192 } 193 return this.clientOutboundChannelRegistration; 194 } 195 196 /** 197 * A hook for subclasses to customize the message channel for messages from 198 * the application or message broker to WebSocket clients. 199 */ 200 protected void configureClientOutboundChannel(ChannelRegistration registration) { 201 } 202 203 @Bean 204 public AbstractSubscribableChannel brokerChannel() { 205 ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); 206 ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ? 207 new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel()); 208 reg.interceptors(new ImmutableMessageChannelInterceptor()); 209 channel.setLogger(SimpLogging.forLog(channel.getLogger())); 210 channel.setInterceptors(reg.getInterceptors()); 211 return channel; 212 } 213 214 @Bean 215 public TaskExecutor brokerChannelExecutor() { 216 ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration(); 217 ThreadPoolTaskExecutor executor; 218 if (reg.hasTaskExecutor()) { 219 executor = reg.taskExecutor().getTaskExecutor(); 220 } 221 else { 222 // Should never be used 223 executor = new ThreadPoolTaskExecutor(); 224 executor.setCorePoolSize(0); 225 executor.setMaxPoolSize(1); 226 executor.setQueueCapacity(0); 227 } 228 executor.setThreadNamePrefix("brokerChannel-"); 229 return executor; 230 } 231 232 /** 233 * An accessor for the {@link MessageBrokerRegistry} that ensures its one-time creation 234 * and initialization through {@link #configureMessageBroker(MessageBrokerRegistry)}. 235 */ 236 protected final MessageBrokerRegistry getBrokerRegistry() { 237 if (this.brokerRegistry == null) { 238 MessageBrokerRegistry registry = new MessageBrokerRegistry(clientInboundChannel(), clientOutboundChannel()); 239 configureMessageBroker(registry); 240 this.brokerRegistry = registry; 241 } 242 return this.brokerRegistry; 243 } 244 245 /** 246 * A hook for subclasses to customize message broker configuration through the 247 * provided {@link MessageBrokerRegistry} instance. 248 */ 249 protected void configureMessageBroker(MessageBrokerRegistry registry) { 250 } 251 252 /** 253 * Provide access to the configured PatchMatcher for access from other 254 * configuration classes. 255 */ 256 @Nullable 257 public final PathMatcher getPathMatcher() { 258 return getBrokerRegistry().getPathMatcher(); 259 } 260 261 @Bean 262 public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler() { 263 SimpAnnotationMethodMessageHandler handler = createAnnotationMethodMessageHandler(); 264 handler.setDestinationPrefixes(getBrokerRegistry().getApplicationDestinationPrefixes()); 265 handler.setMessageConverter(brokerMessageConverter()); 266 handler.setValidator(simpValidator()); 267 268 List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>(); 269 addArgumentResolvers(argumentResolvers); 270 handler.setCustomArgumentResolvers(argumentResolvers); 271 272 List<HandlerMethodReturnValueHandler> returnValueHandlers = new ArrayList<>(); 273 addReturnValueHandlers(returnValueHandlers); 274 handler.setCustomReturnValueHandlers(returnValueHandlers); 275 276 PathMatcher pathMatcher = getBrokerRegistry().getPathMatcher(); 277 if (pathMatcher != null) { 278 handler.setPathMatcher(pathMatcher); 279 } 280 return handler; 281 } 282 283 /** 284 * Protected method for plugging in a custom subclass of 285 * {@link org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler 286 * SimpAnnotationMethodMessageHandler}. 287 * @since 4.2 288 */ 289 protected SimpAnnotationMethodMessageHandler createAnnotationMethodMessageHandler() { 290 return new SimpAnnotationMethodMessageHandler(clientInboundChannel(), 291 clientOutboundChannel(), brokerMessagingTemplate()); 292 } 293 294 protected void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) { 295 } 296 297 protected void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) { 298 } 299 300 @Bean 301 @Nullable 302 public AbstractBrokerMessageHandler simpleBrokerMessageHandler() { 303 SimpleBrokerMessageHandler handler = getBrokerRegistry().getSimpleBroker(brokerChannel()); 304 if (handler == null) { 305 return null; 306 } 307 updateUserDestinationResolver(handler); 308 return handler; 309 } 310 311 private void updateUserDestinationResolver(AbstractBrokerMessageHandler handler) { 312 Collection<String> prefixes = handler.getDestinationPrefixes(); 313 if (!prefixes.isEmpty() && !prefixes.iterator().next().startsWith("/")) { 314 ((DefaultUserDestinationResolver) userDestinationResolver()).setRemoveLeadingSlash(true); 315 } 316 } 317 318 @Bean 319 @Nullable 320 public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() { 321 StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel()); 322 if (handler == null) { 323 return null; 324 } 325 Map<String, MessageHandler> subscriptions = new HashMap<>(4); 326 String destination = getBrokerRegistry().getUserDestinationBroadcast(); 327 if (destination != null) { 328 subscriptions.put(destination, userDestinationMessageHandler()); 329 } 330 destination = getBrokerRegistry().getUserRegistryBroadcast(); 331 if (destination != null) { 332 subscriptions.put(destination, userRegistryMessageHandler()); 333 } 334 handler.setSystemSubscriptions(subscriptions); 335 updateUserDestinationResolver(handler); 336 return handler; 337 } 338 339 @Bean 340 public UserDestinationMessageHandler userDestinationMessageHandler() { 341 UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(), 342 brokerChannel(), userDestinationResolver()); 343 String destination = getBrokerRegistry().getUserDestinationBroadcast(); 344 if (destination != null) { 345 handler.setBroadcastDestination(destination); 346 } 347 return handler; 348 } 349 350 @Bean 351 @Nullable 352 public MessageHandler userRegistryMessageHandler() { 353 if (getBrokerRegistry().getUserRegistryBroadcast() == null) { 354 return null; 355 } 356 SimpUserRegistry userRegistry = userRegistry(); 357 Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); 358 return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, 359 brokerMessagingTemplate(), getBrokerRegistry().getUserRegistryBroadcast(), 360 messageBrokerTaskScheduler()); 361 } 362 363 // Expose alias for 4.1 compatibility 364 @Bean(name = {"messageBrokerTaskScheduler", "messageBrokerSockJsTaskScheduler"}) 365 public TaskScheduler messageBrokerTaskScheduler() { 366 ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); 367 scheduler.setThreadNamePrefix("MessageBroker-"); 368 scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); 369 scheduler.setRemoveOnCancelPolicy(true); 370 return scheduler; 371 } 372 373 @Bean 374 public SimpMessagingTemplate brokerMessagingTemplate() { 375 SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel()); 376 String prefix = getBrokerRegistry().getUserDestinationPrefix(); 377 if (prefix != null) { 378 template.setUserDestinationPrefix(prefix); 379 } 380 template.setMessageConverter(brokerMessageConverter()); 381 return template; 382 } 383 384 @Bean 385 public CompositeMessageConverter brokerMessageConverter() { 386 List<MessageConverter> converters = new ArrayList<>(); 387 boolean registerDefaults = configureMessageConverters(converters); 388 if (registerDefaults) { 389 converters.add(new StringMessageConverter()); 390 converters.add(new ByteArrayMessageConverter()); 391 if (jackson2Present) { 392 converters.add(createJacksonConverter()); 393 } 394 } 395 return new CompositeMessageConverter(converters); 396 } 397 398 protected MappingJackson2MessageConverter createJacksonConverter() { 399 DefaultContentTypeResolver resolver = new DefaultContentTypeResolver(); 400 resolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON); 401 MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); 402 converter.setContentTypeResolver(resolver); 403 return converter; 404 } 405 406 /** 407 * Override this method to add custom message converters. 408 * @param messageConverters the list to add converters to, initially empty 409 * @return {@code true} if default message converters should be added to list, 410 * {@code false} if no more converters should be added 411 */ 412 protected boolean configureMessageConverters(List<MessageConverter> messageConverters) { 413 return true; 414 } 415 416 @Bean 417 public UserDestinationResolver userDestinationResolver() { 418 DefaultUserDestinationResolver resolver = new DefaultUserDestinationResolver(userRegistry()); 419 String prefix = getBrokerRegistry().getUserDestinationPrefix(); 420 if (prefix != null) { 421 resolver.setUserDestinationPrefix(prefix); 422 } 423 return resolver; 424 } 425 426 @Bean 427 @SuppressWarnings("deprecation") 428 public SimpUserRegistry userRegistry() { 429 SimpUserRegistry registry = createLocalUserRegistry(); 430 if (registry == null) { 431 registry = createLocalUserRegistry(getBrokerRegistry().getUserRegistryOrder()); 432 } 433 boolean broadcast = getBrokerRegistry().getUserRegistryBroadcast() != null; 434 return (broadcast ? new MultiServerUserRegistry(registry) : registry); 435 } 436 437 /** 438 * Create the user registry that provides access to local users. 439 * @deprecated as of 5.1 in favor of {@link #createLocalUserRegistry(Integer)} 440 */ 441 @Deprecated 442 @Nullable 443 protected SimpUserRegistry createLocalUserRegistry() { 444 return null; 445 } 446 447 /** 448 * Create the user registry that provides access to local users. 449 * @param order the order to use as a {@link SmartApplicationListener}. 450 * @since 5.1 451 */ 452 protected abstract SimpUserRegistry createLocalUserRegistry(@Nullable Integer order); 453 454 /** 455 * Return an {@link org.springframework.validation.Validator} instance for 456 * validating {@code @Payload} method arguments. 457 * <p>In order, this method tries to get a Validator instance: 458 * <ul> 459 * <li>delegating to getValidator() first</li> 460 * <li>if none returned, getting an existing instance with its well-known name "mvcValidator", 461 * created by an MVC configuration</li> 462 * <li>if none returned, checking the classpath for the presence of a JSR-303 implementation 463 * before creating a {@code OptionalValidatorFactoryBean}</li> 464 * <li>returning a no-op Validator instance</li> 465 * </ul> 466 */ 467 protected Validator simpValidator() { 468 Validator validator = getValidator(); 469 if (validator == null) { 470 if (this.applicationContext != null && this.applicationContext.containsBean(MVC_VALIDATOR_NAME)) { 471 validator = this.applicationContext.getBean(MVC_VALIDATOR_NAME, Validator.class); 472 } 473 else if (ClassUtils.isPresent("javax.validation.Validator", getClass().getClassLoader())) { 474 Class<?> clazz; 475 try { 476 String className = "org.springframework.validation.beanvalidation.OptionalValidatorFactoryBean"; 477 clazz = ClassUtils.forName(className, AbstractMessageBrokerConfiguration.class.getClassLoader()); 478 } 479 catch (Throwable ex) { 480 throw new BeanInitializationException("Could not find default validator class", ex); 481 } 482 validator = (Validator) BeanUtils.instantiateClass(clazz); 483 } 484 else { 485 validator = new Validator() { 486 @Override 487 public boolean supports(Class<?> clazz) { 488 return false; 489 } 490 @Override 491 public void validate(@Nullable Object target, Errors errors) { 492 } 493 }; 494 } 495 } 496 return validator; 497 } 498 499 /** 500 * Override this method to provide a custom {@link Validator}. 501 * @since 4.0.1 502 */ 503 @Nullable 504 public Validator getValidator() { 505 return null; 506 } 507 508}