001/* 002 * Copyright 2002-2018 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * https://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package org.springframework.messaging.simp.broker; 018 019import java.util.Collection; 020import java.util.Collections; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import org.apache.commons.logging.Log; 024 025import org.springframework.context.ApplicationEventPublisher; 026import org.springframework.context.ApplicationEventPublisherAware; 027import org.springframework.context.SmartLifecycle; 028import org.springframework.lang.Nullable; 029import org.springframework.messaging.Message; 030import org.springframework.messaging.MessageChannel; 031import org.springframework.messaging.MessageHandler; 032import org.springframework.messaging.SubscribableChannel; 033import org.springframework.messaging.simp.SimpLogging; 034import org.springframework.messaging.simp.SimpMessageHeaderAccessor; 035import org.springframework.messaging.simp.SimpMessageType; 036import org.springframework.messaging.support.ChannelInterceptor; 037import org.springframework.messaging.support.InterceptableChannel; 038import org.springframework.util.Assert; 039import org.springframework.util.CollectionUtils; 040 041/** 042 * Abstract base class for a {@link MessageHandler} that broker messages to 043 * registered subscribers. 044 * 045 * @author Rossen Stoyanchev 046 * @since 4.0 047 */ 048public abstract class AbstractBrokerMessageHandler 049 implements MessageHandler, ApplicationEventPublisherAware, SmartLifecycle { 050 051 protected final Log logger = SimpLogging.forLogName(getClass()); 052 053 private final SubscribableChannel clientInboundChannel; 054 055 private final MessageChannel clientOutboundChannel; 056 057 private final SubscribableChannel brokerChannel; 058 059 private final Collection<String> destinationPrefixes; 060 061 private boolean preservePublishOrder = false; 062 063 @Nullable 064 private ApplicationEventPublisher eventPublisher; 065 066 private AtomicBoolean brokerAvailable = new AtomicBoolean(false); 067 068 private final BrokerAvailabilityEvent availableEvent = new BrokerAvailabilityEvent(true, this); 069 070 private final BrokerAvailabilityEvent notAvailableEvent = new BrokerAvailabilityEvent(false, this); 071 072 private boolean autoStartup = true; 073 074 private volatile boolean running = false; 075 076 private final Object lifecycleMonitor = new Object(); 077 078 private final ChannelInterceptor unsentDisconnectInterceptor = new UnsentDisconnectChannelInterceptor(); 079 080 081 /** 082 * Constructor with no destination prefixes (matches all destinations). 083 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 084 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 085 * @param brokerChannel the channel for the application to send messages to the broker 086 */ 087 public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 088 SubscribableChannel brokerChannel) { 089 090 this(inboundChannel, outboundChannel, brokerChannel, Collections.emptyList()); 091 } 092 093 /** 094 * Constructor with destination prefixes to match to destinations of messages. 095 * @param inboundChannel the channel for receiving messages from clients (e.g. WebSocket clients) 096 * @param outboundChannel the channel for sending messages to clients (e.g. WebSocket clients) 097 * @param brokerChannel the channel for the application to send messages to the broker 098 * @param destinationPrefixes prefixes to use to filter out messages 099 */ 100 public AbstractBrokerMessageHandler(SubscribableChannel inboundChannel, MessageChannel outboundChannel, 101 SubscribableChannel brokerChannel, @Nullable Collection<String> destinationPrefixes) { 102 103 Assert.notNull(inboundChannel, "'inboundChannel' must not be null"); 104 Assert.notNull(outboundChannel, "'outboundChannel' must not be null"); 105 Assert.notNull(brokerChannel, "'brokerChannel' must not be null"); 106 107 this.clientInboundChannel = inboundChannel; 108 this.clientOutboundChannel = outboundChannel; 109 this.brokerChannel = brokerChannel; 110 111 destinationPrefixes = (destinationPrefixes != null ? destinationPrefixes : Collections.emptyList()); 112 this.destinationPrefixes = Collections.unmodifiableCollection(destinationPrefixes); 113 } 114 115 116 public SubscribableChannel getClientInboundChannel() { 117 return this.clientInboundChannel; 118 } 119 120 public MessageChannel getClientOutboundChannel() { 121 return this.clientOutboundChannel; 122 } 123 124 public SubscribableChannel getBrokerChannel() { 125 return this.brokerChannel; 126 } 127 128 public Collection<String> getDestinationPrefixes() { 129 return this.destinationPrefixes; 130 } 131 132 /** 133 * Whether the client must receive messages in the order of publication. 134 * <p>By default messages sent to the {@code "clientOutboundChannel"} may 135 * not be processed in the same order because the channel is backed by a 136 * ThreadPoolExecutor that in turn does not guarantee processing in order. 137 * <p>When this flag is set to {@code true} messages within the same session 138 * will be sent to the {@code "clientOutboundChannel"} one at a time in 139 * order to preserve the order of publication. Enable this only if needed 140 * since there is some performance overhead to keep messages in order. 141 * @param preservePublishOrder whether to publish in order 142 * @since 5.1 143 */ 144 public void setPreservePublishOrder(boolean preservePublishOrder) { 145 OrderedMessageSender.configureOutboundChannel(this.clientOutboundChannel, preservePublishOrder); 146 this.preservePublishOrder = preservePublishOrder; 147 } 148 149 /** 150 * Whether to ensure messages are received in the order of publication. 151 * @since 5.1 152 */ 153 public boolean isPreservePublishOrder() { 154 return this.preservePublishOrder; 155 } 156 157 @Override 158 public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher publisher) { 159 this.eventPublisher = publisher; 160 } 161 162 @Nullable 163 public ApplicationEventPublisher getApplicationEventPublisher() { 164 return this.eventPublisher; 165 } 166 167 public void setAutoStartup(boolean autoStartup) { 168 this.autoStartup = autoStartup; 169 } 170 171 @Override 172 public boolean isAutoStartup() { 173 return this.autoStartup; 174 } 175 176 177 @Override 178 public void start() { 179 synchronized (this.lifecycleMonitor) { 180 logger.info("Starting..."); 181 this.clientInboundChannel.subscribe(this); 182 this.brokerChannel.subscribe(this); 183 if (this.clientInboundChannel instanceof InterceptableChannel) { 184 ((InterceptableChannel) this.clientInboundChannel).addInterceptor(0, this.unsentDisconnectInterceptor); 185 } 186 startInternal(); 187 this.running = true; 188 logger.info("Started."); 189 } 190 } 191 192 protected void startInternal() { 193 } 194 195 @Override 196 public void stop() { 197 synchronized (this.lifecycleMonitor) { 198 logger.info("Stopping..."); 199 stopInternal(); 200 this.clientInboundChannel.unsubscribe(this); 201 this.brokerChannel.unsubscribe(this); 202 if (this.clientInboundChannel instanceof InterceptableChannel) { 203 ((InterceptableChannel) this.clientInboundChannel).removeInterceptor(this.unsentDisconnectInterceptor); 204 } 205 this.running = false; 206 logger.info("Stopped."); 207 } 208 } 209 210 protected void stopInternal() { 211 } 212 213 @Override 214 public final void stop(Runnable callback) { 215 synchronized (this.lifecycleMonitor) { 216 stop(); 217 callback.run(); 218 } 219 } 220 221 /** 222 * Check whether this message handler is currently running. 223 * <p>Note that even when this message handler is running the 224 * {@link #isBrokerAvailable()} flag may still independently alternate between 225 * being on and off depending on the concrete sub-class implementation. 226 */ 227 @Override 228 public final boolean isRunning() { 229 return this.running; 230 } 231 232 /** 233 * Whether the message broker is currently available and able to process messages. 234 * <p>Note that this is in addition to the {@link #isRunning()} flag, which 235 * indicates whether this message handler is running. In other words the message 236 * handler must first be running and then the {@code #isBrokerAvailable()} flag 237 * may still independently alternate between being on and off depending on the 238 * concrete sub-class implementation. 239 * <p>Application components may implement 240 * {@code org.springframework.context.ApplicationListener<BrokerAvailabilityEvent>} 241 * to receive notifications when broker becomes available and unavailable. 242 */ 243 public boolean isBrokerAvailable() { 244 return this.brokerAvailable.get(); 245 } 246 247 248 @Override 249 public void handleMessage(Message<?> message) { 250 if (!this.running) { 251 if (logger.isTraceEnabled()) { 252 logger.trace(this + " not running yet. Ignoring " + message); 253 } 254 return; 255 } 256 handleMessageInternal(message); 257 } 258 259 protected abstract void handleMessageInternal(Message<?> message); 260 261 262 protected boolean checkDestinationPrefix(@Nullable String destination) { 263 if (destination == null || CollectionUtils.isEmpty(this.destinationPrefixes)) { 264 return true; 265 } 266 for (String prefix : this.destinationPrefixes) { 267 if (destination.startsWith(prefix)) { 268 return true; 269 } 270 } 271 return false; 272 } 273 274 protected void publishBrokerAvailableEvent() { 275 boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true); 276 if (this.eventPublisher != null && shouldPublish) { 277 if (logger.isInfoEnabled()) { 278 logger.info(this.availableEvent); 279 } 280 this.eventPublisher.publishEvent(this.availableEvent); 281 } 282 } 283 284 protected void publishBrokerUnavailableEvent() { 285 boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false); 286 if (this.eventPublisher != null && shouldPublish) { 287 if (logger.isInfoEnabled()) { 288 logger.info(this.notAvailableEvent); 289 } 290 this.eventPublisher.publishEvent(this.notAvailableEvent); 291 } 292 } 293 294 /** 295 * Get the MessageChannel to use for sending messages to clients, possibly 296 * a per-session wrapper when {@code preservePublishOrder=true}. 297 * @since 5.1 298 */ 299 protected MessageChannel getClientOutboundChannelForSession(String sessionId) { 300 return this.preservePublishOrder ? 301 new OrderedMessageSender(getClientOutboundChannel(), logger) : getClientOutboundChannel(); 302 } 303 304 305 /** 306 * Detect unsent DISCONNECT messages and process them anyway. 307 */ 308 private class UnsentDisconnectChannelInterceptor implements ChannelInterceptor { 309 310 @Override 311 public void afterSendCompletion( 312 Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) { 313 314 if (!sent) { 315 SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(message.getHeaders()); 316 if (SimpMessageType.DISCONNECT.equals(messageType)) { 317 logger.debug("Detected unsent DISCONNECT message. Processing anyway."); 318 handleMessage(message); 319 } 320 } 321 } 322 } 323 324}